Zipping Steams...
Brent Walker
brenthwalker at gmail.com
Sun Jan 12 17:53:07 PST 2014
Thanks for that Sam. Somehow forgot that streams give you access to their
spliterators. Your code worked (almost) out of the box for me (modulo some
data structure mappings...).
Here is my version which has 3 minor improvements on top of yours.
1. In tryAdvance() the code below avoids the allocation of the
AtomicBoolean by adding the boolean as a member to the ZipSpliterator class
(I am guessing you did not really want the atomicity guarantees here -- you
just wanted the reference aspect so that the variable was effectively final
so that you can use it in the closure).
2. In trySplit, if the first spliterator refuses to split there is no point
in calling the second one asking for a split -- you were always calling
both but ignoring the second if the first one was null.
3. In estimateSize(), you only consult the estimateSize() of the left
spliterator. Since we stop the moment the any of the two streams runs out
perhaps a better estimate of the size of the new stream size is:
Math.min(mLeftSpliter.estimateSize(), mRightSpliter.estimateSize())
Here is the code:
==========================================================
private static final class ZipSpliterator<T, U> implements
Spliterator<Pair<T, U>> {
private final Spliterator<T> mLeftSpliter;
private final Spliterator<U> mRightSpliter;
private boolean mTryAdvanceStatus;
public static <T, U> ZipSpliterator<T, U> create(final Spliterator<T>
leftSpliter,
final Spliterator<U>
rightSpliter) {
return new ZipSpliterator<>(leftSpliter, rightSpliter);
}
private ZipSpliterator(final Spliterator<T> leftSpliter, final
Spliterator<U> rightSpliter) {
mLeftSpliter = leftSpliter;
mRightSpliter = rightSpliter;
}
@Override
public boolean tryAdvance(final Consumer<? super Pair<T, U>> action) {
mTryAdvanceStatus = false;
mLeftSpliter.tryAdvance(leftElem -> {
mRightSpliter.tryAdvance(rightElem -> {
action.accept(Pair.create(leftElem, rightElem));
mTryAdvanceStatus = true;
});
});
return mTryAdvanceStatus;
}
@Override
public Spliterator<Pair<T, U>> trySplit() {
final Spliterator<T> leftSplit;
final Spliterator<U> rightSplit;
if ((leftSplit = mLeftSpliter.trySplit()) == null) {
return null;
}
else if ((rightSplit = mRightSpliter.trySplit()) == null) {
return null;
}
else {
return ZipSpliterator.create(leftSplit, rightSplit);
}
}
@Override
public long estimateSize() {
return Math.min(mLeftSpliter.estimateSize(),
mRightSpliter.estimateSize());
}
@Override
public int characteristics() {
return mLeftSpliter.characteristics() &
mRightSpliter.characteristics();
}
}
public static <T, U> Stream<Pair<T, U>> zip(final Stream<T> ts, final
Stream<U> us) {
return StreamSupport.stream(ZipSpliterator.create(ts.spliterator(),
us.spliterator()), false);
}
==========================================================
Brent
On Mon, Jan 13, 2014 at 2:15 AM, Sam Pullara <spullara at gmail.com> wrote:
> Something like this I think would work pretty well. Would love a
> suggestion to get rid of the side-effect in tryAdvance. This might even
> parallelize?
>
> Sam
>
> package spullara.util;
>
> import java.util.Spliterator;
> import java.util.concurrent.atomic.AtomicBoolean;
> import java.util.function.Consumer;
> import java.util.stream.Stream;
> import java.util.stream.StreamSupport;
>
> /**
> * Zip two streams as efficiently as we can?
> */
> public class Zipper {
> public static class Pair<L, R> {
> final L _1;
> final R _2;
>
> public Pair(L l, R r) {
> _1 = l;
> _2 = r;
> }
> }
>
> public static <L, R> Stream<Pair<L, R>> zip(Stream<L> left, Stream<R>
> right) {
> Spliterator<L> lsplit = left.spliterator();
> Spliterator<R> rsplit = right.spliterator();
> Spliterator<Pair<L, R>> split = new PairSpliterator<>(lsplit,
> rsplit);
> return StreamSupport.stream(split, false);
> }
>
> private static class PairSpliterator<L, R> implements
> Spliterator<Pair<L, R>> {
> private final Spliterator<L> lsplit;
> private final Spliterator<R> rsplit;
>
> public PairSpliterator(Spliterator<L> lsplit, Spliterator<R>
> rsplit) {
> this.lsplit = lsplit;
> this.rsplit = rsplit;
> }
>
> @Override
> public boolean tryAdvance(Consumer<? super Pair<L, R>> action) {
> AtomicBoolean advance = new AtomicBoolean();
> lsplit.tryAdvance(l -> {
> boolean b = rsplit.tryAdvance(r -> {
> action.accept(new Pair<>(l, r));
> });
> advance.set(b);
> });
> return advance.get();
> }
>
> @Override
> public Spliterator<Pair<L, R>> trySplit() {
> Spliterator<L> lSpliterator = lsplit.trySplit();
> Spliterator<R> rSpliterator = rsplit.trySplit();
> return lSpliterator == null ? null : rSpliterator == null ?
> null : new PairSpliterator<>(lSpliterator, rSpliterator);
> }
>
> @Override
> public long estimateSize() {
> return lsplit.estimateSize();
> }
>
> @Override
> public int characteristics() {
> return lsplit.characteristics() & rsplit.characteristics();
> }
> }
> }
>
> On Jan 12, 2014, at 3:58 PM, Brent Walker <brenthwalker at gmail.com> wrote:
>
> > Suppose I wanted to implement Haskell's or ML's zip() function for
> streams
> > (assume I have my own Pair<U, T> class). One easily comes up with the
> > following function:
> >
> > public static <T, U> Stream<Pair<T, U>> zip(final Stream<T> ts, final
> > Stream<U> us) {
> > @SuppressWarnings("unchecked")
> > final T[] tsVec = (T[]) ts.toArray();
> >
> > @SuppressWarnings("unchecked")
> > final U[] usVec = (U[]) us.toArray();
> >
> > final int siz = Math.min(tsVec.length, usVec.length);
> >
> > return IntStream.range(0, siz).mapToObj(i -> Pair.create(tsVec[i],
> > usVec[i]));
> > }
> >
> > This works fine for my needs but as a general routine this function has
> > issues. First, it doesn't work at all for infinite streams, and second,
> > there are those intermediate data structures it builds (tsVec, and usVec)
> > in order to function which are wasteful (at least they look wasteful).
> >
> > Is there a better way to implement zip that avoids the above two
> problems?
> >
> > Thanks for any suggestions/ideas etc.
> >
> > Brent
> >
>
>
More information about the lambda-dev
mailing list