[concurrency-interest] class SplittableRandom

Paul Sandoz paul.sandoz at oracle.com
Fri Jul 12 03:28:21 PDT 2013


I am not sure if this is an important use-case or not but it is fun to think about.

One solution is to write a zipper for a tuple of {List<T>, rn}, but, when mapping, it would incur a l.get(rn % l.size()) that might result in over-represented values (but perhaps not if the range is a power of 2??).

A better solution is to write your own spliterator that defers to an underlying spliterator, tracks splits, and reports a tuple of {List<T>, SplittableRange} when traversed e.g:

  Stream<List<List<Integer>> s = ...
  withSplittableRandom(s, new SplittableRandom()).map(t -> a().get(t.b().nextInt(t.a().size()))

The withSplittableRandom would do something like:

  return StreamSupport.stream(createWrappingSplittableRandomSpliterator(s.spliterator(), sr), false);

I have not tested the code below but i think it should work if you want to play with that.

Paul.

    interface Pair<A, B> {
        A a();
        B b();

        static <A, B> Pair<A, B> of(A a, B b) {
            return new Pair<A, B>() {
                @Override
                public A a() {
                    return a;
                }

                @Override
                public B b() {
                    return b;
                }
            };
        }
    }

    public class WrappingSplittableRandomSpliterator<T> implements Spliterator<Pair<T, SplittableRandom>> {

        final Spliterator<T> s;

        final SplittableRandom sr;

        public WrappingSplittableRandomSpliterator(Spliterator<T> s, SplittableRandom sr) {
            this.s = s;
            this.sr = sr;
        }

        Consumer<T> pairConsumer(Consumer<? super Pair<T, SplittableRandom>> action) {
            return e -> action.accept(Pair.of(e, sr));
        }

        @Override
        public boolean tryAdvance(Consumer<? super Pair<T, SplittableRandom>> action) {
            return s.tryAdvance(pairConsumer(action));
        }

        @Override
        public void forEachRemaining(Consumer<? super Pair<T, SplittableRandom>> action) {
            s.forEachRemaining(pairConsumer(action));
        }

        @Override
        public WrappingSplittableRandomSpliterator<T> trySplit() {
            Spliterator<T> split = s.trySplit();
            return split == null
                   ? null
                   : new WrappingSplittableRandomSpliterator<>(split, sr.split());
        }

        @Override
        public long estimateSize() {
            return s.estimateSize();
        }

        @Override
        public long getExactSizeIfKnown() {
            return s.getExactSizeIfKnown();
        }

        @Override
        public int characteristics() {
            return s.characteristics();
        }

        @Override
        public boolean hasCharacteristics(int characteristics) {
            return s.hasCharacteristics(characteristics);
        }

        @Override
        public Comparator<? super Pair<T, SplittableRandom>> getComparator() {
            Comparator<? super T> c = s.getComparator();
            return c == null
                   ? null
                   : (p1, p2) -> c.compare(p1.a(), p2.a());
        }
    }


On Jul 11, 2013, at 4:29 PM, Peter Levart <peter.levart at gmail.com> wrote:

> On 07/11/2013 01:14 AM, Aaron Grunthal wrote:
>> Would this be of any use in a case like this?
>> 
>> List<List<Int>> a;
>> 
>> a.parallelStream().filter(...).map(e->e.get(random.nextInt(e.size()))).reduce(...) 
>> 
>> 
>> i.e. a filter, random sample, reduce chain?
>> 
>> I don't see a way to basically use two (or N) streams (the source and 
>> the random numbers) and merge them in lockstep at one stage.
>> 
> 
> In the absence of Stream.join(), we would need something like the 
> following in the Stream API:
> 
> Stream<T> {
> 
>         <R, S> Stream<R> splitMap(S seed,
>                                   UnaryOperator<S> splitter,
>                                   BiFunction<? super T, ? super S, ? 
> extends R> mapper);
> 
> 
> Your example would then read:
> 
> List<List<Int>> a = ...;
> 
> a.parallelStream()
> .filter(...)
> .splitMap(
> new SplittableRandom(),
>         sr -> sr.split(),
> (list, sr) -> list.get(sr.nextInt(list.size()))
> )
> .reduce(...)
> 
> 
> But I don't know if such API is usable in any other scenarios though.
> 
> Regards, Peter
> 
> 
>> On 10.07.2013 21:13, Doug Lea wrote:
>>> [Note: I'm also posting this on the openjdk core-libs-dev list.]
>>> 
>>> 
>>> We expect that using random numbers in parallel Stream computations
>>> will be common. (We know it is common in parallel computing in
>>> general.) But we had left support for it in an unsatisfactory state.
>>> If you want to create a stream of random numbers to drive a parallel
>>> computation, you'd choose among two options, neither of them providing
>>> what you probably want: (1) Use a stream based on a single shared
>>> java.util.Random object, in which case your program will encounter
>>> stunning slowdowns when run with many cores; or (2) Use a stream based
>>> on ThreadLocalRandom, which avoids contention, but gives you no
>>> control over the use or properties of the per-thread singleton Random
>>> object. While the ThreadLocalRandom option is great for many purposes,
>>> you wouldn't want to use it in, say, a high-quality Monte Carlo
>>> simulation.
>> 
>> 
>> _______________________________________________
>> Concurrency-interest mailing list
>> Concurrency-interest at cs.oswego.edu
>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
> 
> 



More information about the lambda-dev mailing list