Zipping Steams...

Paul Sandoz paul.sandoz at oracle.com
Mon Jan 13 02:05:36 PST 2014


Hi,

We originally had zip, see below, and pulled it for various and familiar reasons (Pair class, nominal functions for primitives, poor parallelism etc).

It's tricky for the ZipperSpliterator to support explicit splitting since the underlying A/B spliterators may not split along the same boundaries so that both cover the same number of elements.

If one tries to split as ZipSpliterator then elements might be dropped when traversing those splits (it may also depend on the level of splitting too, which is determined by the #cores) and developers might find that surprising. For example, consider zipping given an ArrayList and a LinkedList, both of which have an encounter order but very different splitting characteristics.

If both A/B spliterators support SIZED/SUBSIZED then we could theoretically pass hints to trySplit.

Paul.

   public static <A, B, C> Stream<C> zip(Stream<? extends A> a,
                                         Stream<? extends B> b,
                                         BiFunction<? super A, ? super B, ? extends C> zipper) {
       Objects.requireNonNull(zipper);
       @SuppressWarnings("unchecked")
       Spliterator<A> as = (Spliterator<A>) Objects.requireNonNull(a).spliterator();
       @SuppressWarnings("unchecked")
       Spliterator<B> bs = (Spliterator<B>) Objects.requireNonNull(b).spliterator();

       // Combining loses DISTINCT and SORTED characteristics and for other
       // characteristics the combined stream has a characteristic if both
       // streams to combine have the characteristic
       int characteristics = as.characteristics() & bs.characteristics() &
                             ~(Spliterator.DISTINCT | Spliterator.SORTED);
       long size = Math.min(as.estimateSize(), bs.estimateSize());

       Spliterator<C> cs = new ZipperSpliterator<>(as, bs, zipper,
                                                   size, characteristics);
       return (a.isParallel() || b.isParallel())
              ? StreamSupport.parallelStream(cs)
              : StreamSupport.stream(cs);
   }

   private static final class ZipperSpliterator<A, B, C> extends Spliterators.AbstractSpliterator<C>
           implements Consumer<Object> {
       final Spliterator<A> as;
       final Spliterator<B> bs;
       final BiFunction<? super A, ? super B, ? extends C> zipper;
       Object a;
       Object b;

       ZipperSpliterator(Spliterator<A> as, Spliterator<B> bs,
                         BiFunction<? super A, ? super B, ? extends C> zipper,
                         long est, int additionalCharacteristics) {
           super(est, additionalCharacteristics);
           this.as = as;
           this.bs = bs;
           this.zipper = zipper;
           this.a = Streams.NONE;
       }

       @Override
       public void accept(Object aOrB) {
           if (a == Streams.NONE) {
               a = aOrB;
           }
           else {
               b = aOrB;
           }
       }

       @Override
       @SuppressWarnings("unchecked")
       public boolean tryAdvance(Consumer<? super C> action) {
           if (as.tryAdvance(this) && bs.tryAdvance(this)) {
               Object a = this.a;
               this.a = Streams.NONE;
               action.accept(zipper.apply((A) a, (B) b));
               return true;
           }
           return false;
       }
   }



More information about the lambda-dev mailing list