Zipping Steams...
Brent Walker
brenthwalker at gmail.com
Mon Jan 13 10:25:21 PST 2014
Thank you for pointing out the error in our ways. Here is your code
updated so that it compiles with the current state of the API.
(StreamSupport.parallelStream() does not exist anymore, Streams.NONE also
not there). Since Streams.NONE was not there, I modified tryAdvance() to
use the continuation passing style we had in our solution and got rid of
the accept(Object) function. This avoids all the casting, and we can
delete Objects "a" and "b" from the Spliterator (which now does not have to
be a Consumer<>). Hopefully hotspot can see throw all the code threading
and inline everything.
Don't you have a place where examples of code can be placed in the jdk?
This code should really be there. People coming to Java from all over
(Python, Groovy, Ruby, Scala, Haskell, ML, C#, F#) will be looking for it
-- brace yourself for another hundred StackOverflow/Reddit threads of
people looking for Zip in Java when Java 8 comes out.
===========================================================
private static final class ZipperSpliterator<T, U, W> extends
Spliterators.AbstractSpliterator<W> {
final Spliterator<T> mLeftSpliter;
final Spliterator<U> mRightSpliter;
final BiFunction<? super T, ? super U, ? extends W> mZipper;
boolean mTryAdvanceStatus;
ZipperSpliterator(final Spliterator<T> leftSpliter,
final Spliterator<U> rightSpliter,
final BiFunction<? super T, ? super U, ? extends W>
zipper,
final long sizeEst,
final int additionalCharacteristics) {
super(sizeEst, additionalCharacteristics);
this.mLeftSpliter = leftSpliter;
this.mRightSpliter = rightSpliter;
this.mZipper = zipper;
}
@Override
public boolean tryAdvance(Consumer<? super W> action) {
mTryAdvanceStatus = false;
mLeftSpliter.tryAdvance(leftElem -> {
mRightSpliter.tryAdvance(rightElem -> {
action.accept(mZipper.apply(leftElem, rightElem));
mTryAdvanceStatus = true;
});
});
return mTryAdvanceStatus;
}
}
public static <T, U, W> Stream<W> zip(final Stream<? extends T> ts,
final Stream<? extends U> us,
final BiFunction<? super T, ? super
U, ? extends W> zipper) {
Objects.requireNonNull(zipper);
@SuppressWarnings("unchecked")
Spliterator<T> leftSpliter = (Spliterator<T>)
Objects.requireNonNull(ts).spliterator();
@SuppressWarnings("unchecked")
Spliterator<U> rightSpliter = (Spliterator<U>)
Objects.requireNonNull(us).spliterator();
// Combining loses DISTINCT and SORTED characteristics and for other
// characteristics the combined stream has a characteristic if both
// streams to combine have the characteristic
final int characteristics = leftSpliter.characteristics() &
rightSpliter.characteristics() & ~(Spliterator.DISTINCT |
Spliterator.SORTED);
final long size = Math.min(leftSpliter.estimateSize(),
rightSpliter.estimateSize());
final Spliterator<W> cs = new ZipperSpliterator<>(leftSpliter,
rightSpliter, zipper, size, characteristics);
return StreamSupport.stream(cs, ts.isParallel() || us.isParallel());
}
===========================================================
Brent
On Mon, Jan 13, 2014 at 12:05 PM, Paul Sandoz <paul.sandoz at oracle.com>wrote:
> Hi,
>
> We originally had zip, see below, and pulled it for various and familiar
> reasons (Pair class, nominal functions for primitives, poor parallelism
> etc).
>
> It's tricky for the ZipperSpliterator to support explicit splitting since
> the underlying A/B spliterators may not split along the same boundaries so
> that both cover the same number of elements.
>
> If one tries to split as ZipSpliterator then elements might be dropped
> when traversing those splits (it may also depend on the level of splitting
> too, which is determined by the #cores) and developers might find that
> surprising. For example, consider zipping given an ArrayList and a
> LinkedList, both of which have an encounter order but very different
> splitting characteristics.
>
> If both A/B spliterators support SIZED/SUBSIZED then we could
> theoretically pass hints to trySplit.
>
> Paul.
>
> public static <A, B, C> Stream<C> zip(Stream<? extends A> a,
> Stream<? extends B> b,
> BiFunction<? super A, ? super B,
> ? extends C> zipper) {
> Objects.requireNonNull(zipper);
> @SuppressWarnings("unchecked")
> Spliterator<A> as = (Spliterator<A>)
> Objects.requireNonNull(a).spliterator();
> @SuppressWarnings("unchecked")
> Spliterator<B> bs = (Spliterator<B>)
> Objects.requireNonNull(b).spliterator();
>
> // Combining loses DISTINCT and SORTED characteristics and for other
> // characteristics the combined stream has a characteristic if both
> // streams to combine have the characteristic
> int characteristics = as.characteristics() & bs.characteristics() &
> ~(Spliterator.DISTINCT | Spliterator.SORTED);
> long size = Math.min(as.estimateSize(), bs.estimateSize());
>
> Spliterator<C> cs = new ZipperSpliterator<>(as, bs, zipper,
> size, characteristics);
> return (a.isParallel() || b.isParallel())
> ? StreamSupport.parallelStream(cs)
> : StreamSupport.stream(cs);
> }
>
> private static final class ZipperSpliterator<A, B, C> extends
> Spliterators.AbstractSpliterator<C>
> implements Consumer<Object> {
> final Spliterator<A> as;
> final Spliterator<B> bs;
> final BiFunction<? super A, ? super B, ? extends C> zipper;
> Object a;
> Object b;
>
> ZipperSpliterator(Spliterator<A> as, Spliterator<B> bs,
> BiFunction<? super A, ? super B, ? extends C>
> zipper,
> long est, int additionalCharacteristics) {
> super(est, additionalCharacteristics);
> this.as = as;
> this.bs = bs;
> this.zipper = zipper;
> this.a = Streams.NONE;
> }
>
> @Override
> public void accept(Object aOrB) {
> if (a == Streams.NONE) {
> a = aOrB;
> }
> else {
> b = aOrB;
> }
> }
>
> @Override
> @SuppressWarnings("unchecked")
> public boolean tryAdvance(Consumer<? super C> action) {
> if (as.tryAdvance(this) && bs.tryAdvance(this)) {
> Object a = this.a;
> this.a = Streams.NONE;
> action.accept(zipper.apply((A) a, (B) b));
> return true;
> }
> return false;
> }
> }
>
>
>
>
>
More information about the lambda-dev
mailing list