Zipping Steams...
Paul Sandoz
paul.sandoz at oracle.com
Mon Jan 13 02:05:36 PST 2014
Hi,
We originally had zip, see below, and pulled it for various and familiar reasons (Pair class, nominal functions for primitives, poor parallelism etc).
It's tricky for the ZipperSpliterator to support explicit splitting since the underlying A/B spliterators may not split along the same boundaries so that both cover the same number of elements.
If one tries to split as ZipSpliterator then elements might be dropped when traversing those splits (it may also depend on the level of splitting too, which is determined by the #cores) and developers might find that surprising. For example, consider zipping given an ArrayList and a LinkedList, both of which have an encounter order but very different splitting characteristics.
If both A/B spliterators support SIZED/SUBSIZED then we could theoretically pass hints to trySplit.
Paul.
public static <A, B, C> Stream<C> zip(Stream<? extends A> a,
Stream<? extends B> b,
BiFunction<? super A, ? super B, ? extends C> zipper) {
Objects.requireNonNull(zipper);
@SuppressWarnings("unchecked")
Spliterator<A> as = (Spliterator<A>) Objects.requireNonNull(a).spliterator();
@SuppressWarnings("unchecked")
Spliterator<B> bs = (Spliterator<B>) Objects.requireNonNull(b).spliterator();
// Combining loses DISTINCT and SORTED characteristics and for other
// characteristics the combined stream has a characteristic if both
// streams to combine have the characteristic
int characteristics = as.characteristics() & bs.characteristics() &
~(Spliterator.DISTINCT | Spliterator.SORTED);
long size = Math.min(as.estimateSize(), bs.estimateSize());
Spliterator<C> cs = new ZipperSpliterator<>(as, bs, zipper,
size, characteristics);
return (a.isParallel() || b.isParallel())
? StreamSupport.parallelStream(cs)
: StreamSupport.stream(cs);
}
private static final class ZipperSpliterator<A, B, C> extends Spliterators.AbstractSpliterator<C>
implements Consumer<Object> {
final Spliterator<A> as;
final Spliterator<B> bs;
final BiFunction<? super A, ? super B, ? extends C> zipper;
Object a;
Object b;
ZipperSpliterator(Spliterator<A> as, Spliterator<B> bs,
BiFunction<? super A, ? super B, ? extends C> zipper,
long est, int additionalCharacteristics) {
super(est, additionalCharacteristics);
this.as = as;
this.bs = bs;
this.zipper = zipper;
this.a = Streams.NONE;
}
@Override
public void accept(Object aOrB) {
if (a == Streams.NONE) {
a = aOrB;
}
else {
b = aOrB;
}
}
@Override
@SuppressWarnings("unchecked")
public boolean tryAdvance(Consumer<? super C> action) {
if (as.tryAdvance(this) && bs.tryAdvance(this)) {
Object a = this.a;
this.a = Streams.NONE;
action.accept(zipper.apply((A) a, (B) b));
return true;
}
return false;
}
}
More information about the lambda-dev
mailing list