Into

Remi Forax forax at univ-mlv.fr
Sat Dec 22 09:29:40 PST 2012


On 12/22/2012 06:16 PM, Brian Goetz wrote:
> Right. You want to do the upstream stuff in parallel, then you want to 
> do the downstream stuff (a) serially, (b) in the current thread, and 
> probably (c) in encounter order.
>
> So, assume for sake of discussion that we have some form of .toList(), 
> whether as a "native" operation or some sort of 
> reduce/combine/tabulate.  Then you can say:
>
>   parallelStream()...toList().forEach(...)
>
> and the list-building will happen in parallel and then forEach can 
> help sequentially.
>
> Given that, is there any reason left for sequential()?

You need also a toSet() and yes, in that case you don't need 
sequential() and it's better because it's more explicit.
But there is still a lot of cases where you are sequential and you want 
to control the destination collection,
that's why we need into().

I don't think it's a good idea to oppose toList() and into() both have 
their purposes.

Rémi

>
>
>
> On 12/22/2012 11:55 AM, Joe Bowbeer wrote:
>> The main use case is sequential().forrEach(), which inserts any ol'
>> for-loop into a computation.
>>
>> On Dec 21, 2012 1:48 PM, "Brian Goetz" <brian.goetz at oracle.com
>> <mailto:brian.goetz at oracle.com>> wrote:
>>
>>     If we get rid of into(), and replace it with an explicit reduction
>>     [1], then we may be able to get rid of sequential() too.
>>
>>     The primary use case for sequential() was for using non-thread-safe
>>     Collections as an into() target.  The convoluted into() turns the
>>     problem around on the target, calling target.addAll(stream) on it,
>>     and the typical implementation of into() is like the one in 
>> Collection:
>>
>>          default void addAll(Stream<? extends E> stream) {
>>              if (stream.isParallel())
>>                  stream = stream.sequential();
>>              stream.forEach(this::add);
>>          }
>>
>>     or more compactly
>>
>>          default void addAll(Stream<? extends E> stream) {
>>              stream.sequential().forEach(__this::add);
>>          }
>>
>>     since sequential() is now a no-op on sequential streams.
>>
>>     The code looks pretty, but the implementation is not; sequential()
>>     is a barrier, meaning you have to stop and collect all the elements
>>     into a temporary tree, and then dump them into the target. But it
>>     is not obvious that it is a barrier, so people will be surprised.
>>       (And on infinite streams, it's bad.)
>>
>>     What used to be implicit in sequential() can now be made explicit 
>> with:
>>
>>        if (stream.isParallel())
>>           stream = 
>> stream...__whateverWeCallOrderPreservingI__nto().stream()
>>
>>     That offers similar semantics and at least as good performance,
>>     while also being more transparent and requiring one fewer weird
>>     stream operation.
>>
>>     (I don't yet see the way to getting rid of .parallel(), but we can
>>     possibly move it out of Stream and into a static method
>>     Streams.parallel(stream), at some loss of discoverability. But we
>>     can discuss.)
>>
>>
>>     [1] Actually, we have to replace it with two explicit reductions, or
>>     more precisely, a reduction and a for-eaching.  One is the pure
>>     reduction case that involves merging, and is suitable for
>>     non-thread-safe collections (and required if order preservation is
>>     desired); the other is the concurrent case, where we bombard a
>>     concurrent collection with puts and hope it manages to sort them
>>     out. The two are semantically very different; one is a reduce and
>>     the other is a forEach, and so they should have different
>>     manifestations in the code.  Though there are really no concurrent
>>     Collections right now anyway (though you could fake a concurrent Set
>>     with a concurrent Map.)
>>
>>
>>
>>     On 12/21/2012 12:50 PM, Brian Goetz wrote:
>>
>>         I'm starting to dislike "into".
>>
>>         First, it's the only stream method which retains mutable state
>>         from the
>>         user.  That's not great.
>>
>>         Second, the parallel story is bad.  People are going to write
>>
>>             list.parallel(e -> e+1).into(new ArrayList<>());
>>
>>         which will do a whole lot of trivial computation in parallel,
>>         wait on
>>         the barrier implicit in sequential(), and then do an O(n) serial
>>         thing.
>>
>>         Third, the semantics are weird; we do this clever trick where
>>         collections have to decide whether to do insertion in serial or
>>         parallel.  But as we all learned from Spinal Tap, there's a fine
>>         line
>>         between clever and stupid.
>>
>>         Instead, we could treat this like a mutable reduce, where 
>> leaves are
>>         reduced to a List, and lists are merged as we go up the tree.
>>           Even with
>>         dumb merging is still going to be much faster than what we've
>>         got now;
>>         no barrier, no buffer the whole thing and copy, and the worst 
>> serial
>>         step is O(n/2) instead of O(n).  So probably 3x better just by
>>         improving
>>         the serial fractions.  But with a smarter combination step, we
>>         can do
>>         better still.  If we have a "concatenated list view" 
>> operation (List
>>         concat(List a, List b)), which returns a read-only, conc-tree
>>         representation), then the big serial stage goes away.
>>
>>         And, of course, building atop reduce makes the whole thing 
>> simpler;
>>         there are fewer ops that have their own distinct semantics, 
>> and the
>>         semantics of into() is about as weird as you get.
>>
>>
>>         Now that the tabulators framework gets users comfortable with 
>> the
>>         explicit choice between functional and concurrent aggregation 
>> for
>>         tabulation, it is a much shorter hop to get there.  So let's
>>         build on
>>         that and find some sort of way to surface mutable and concurrent
>>         versions of "into".  (Currently we have no good concurrent
>>         list-shaped
>>         collections, but hopefully that changes.)
>>
>>         Something like:
>>
>>             Stream.tabulate(collector(__ArrayList::new))
>> Stream.tabulate(__concurrentCollector(__ConcurrentFooList::new))
>>
>>         Maybe with some rename of tabulate.
>>
>>         I think there's a small reorganization of naming lurking here
>>         (involving
>>         tabulate, Tabulator, ConcurrentTabulator, MutableReducer,
>>         reduce) that
>>         recasts into() either as an explicit functional or concurrent
>>         tabulation.  And one more tricky+slow special-purpose op bites
>>         the dust,
>>         in favor of something that builds on our two favorite
>>         primitives, fold
>>         (order-preserving) and forEach (not order-preserving.)
>>
>>



More information about the lambda-libs-spec-observers mailing list