RFR: 8072727 - add variation of Stream.iterate() that's finite

Tagir F. Valeev amaembo at gmail.com
Fri Mar 11 08:27:19 UTC 2016


Using the IntEmitter the Craps round could also be implemented easily
and without additional classes:

public static IntEmitter crapsEmitter(IntSupplier dice, int point) {
  return action -> {
    int roll = dice.getAsInt();
    action.accept(roll);
    return roll == 7 || (point == 0 && (roll > 10 || roll < 4)) || (point != 0 && roll == point)
        ? null : crapsEmitter(dice, point == 0 ? roll : point);
  };
}

Usage:

Random r = new Random();
IntSupplier dice = () -> r.nextInt(6) + r.nextInt(6) + 2;
crapsEmitter(dice, 0).stream().forEach(System.out::println);

TFV> Here's the source code I wrote for this test:

TFV> http://cr.openjdk.java.net/~tvaleev/patches/produce/ProduceTest.java

TFV> I also have somewhat weird idea of emitters (added to this test class
TFV> as well). Here's example:

TFV> public static IntEmitter collatzEmitter(int val) {
TFV>     return action -> {
TFV>         action.accept(val);
TFV>         return val == 1 ? null : collatzEmitter(val % 2 == 0 ? val / 2 : val * 3 + 1);
TFV>     };
TFV> }

TFV> Usage:

TFV> collatzEmitter(17).stream().forEach(System.out::println);

TFV> It does not need any special value as well as special handling of
TFV> starting value. No additional code for finishing is also necessary.
TFV> However such solution surely produces garbage (at least one lambda
TFV> instance per iteration), so the performance would degrade. However I
TFV> like it.

TFV> With best regards,
TFV> Tagir Valeev.

TFV>> Hello!

TFV>> Just for your information I implemented two alternative stream
TFV>> producers as you suggested:

TFV>> // Produces stream of values returned by producer until the first
TFV>> // empty optional is produced
TFV>> public static IntStream produce(Supplier<OptionalInt> producer);

TFV>> // Produces stream of values until predicate returns false
TFV>> // iff predicate returns true it must call the supplied IntConsumer
TFV>> // exactly once (like in Spliterator.tryAdvance)
TFV>> public static IntStream produce(Predicate<IntConsumer> producer);

TFV>> // Produces stream of values until advancer doesn't call the
TFV>> // IntConsumer at least once.
TFV>> // Calling the IntConsumer multiple times is also acceptable.
TFV>> public static IntStream produce(Consumer<IntConsumer> advancer);

TFV>> I tried to produce the Collatz sequence starting from given number and
TFV>> including the trailing one. Unfortunately I don't see how such
TFV>> signatures might simplify the thing. Here's three-arg iterate
TFV>> implementation posted by me before, for reference:

TFV>> IntStream.iterate(start, val -> val != -1,
TFV>>     val -> val == 1 ? -1 : val % 2 == 0 ? val / 2 : val * 3 + 1);

TFV>> Here's plain-old-loop implementation (without stream):

TFV>> public static void collatzLoop(int start) {
TFV>>   int val = start;
TFV>>   System.out.println(val);
TFV>>   while(val != 1) {
TFV>>     val = val % 2 == 0 ? val / 2 : val * 3 + 1;
TFV>>     System.out.println(val);
TFV>>   }
TFV>> }

TFV>> The problem here is that in the easiest implementation we need to emit
TFV>> the value at two places. Otherwise we either have problems with the
TFV>> last value or with the first value. Here's Supplier<OptionalInt>
TFV>> implementation:

TFV>> public static IntStream collatzSupplier(int start) {
TFV>>   int[] cur = {-1};
TFV>>   return produce(() -> 
TFV>>     cur[0] == 1 ? OptionalInt.empty() 
TFV>>         : OptionalInt.of(cur[0] = 
TFV>>           (cur[0] == -1 ? start : 
TFV>>             cur[0] % 2 == 0 ? 
TFV>>               cur[0] / 2 : cur[0] * 3 + 1)));
TFV>> }

TFV>> Of course we have to maintain the shared state (here I used dirty
TFV>> one-element array trick). Here we have the same three conditions: to
TFV>> detect starting case, to detect finishing case and to separate
TFV>> even/odd cases. Also we also need some special value. Predicate-based
TFV>> solution is not simpler and has the same three conditions:

TFV>> public static IntStream collatzPredicate(int start) {
TFV>>   int[] cur = {-1};
TFV>>   return produce(action -> {
TFV>>     if(cur[0] == 1) return false;
TFV>>     action.accept(cur[0] = (
TFV>>         cur[0] == -1 ? start :
TFV>>         cur[0] % 2 == 0 ? cur[0] / 2 : cur[0] * 3 + 1));
TFV>>     return true;
TFV>>   });
TFV>> }

TFV>> So, at least for Collatz problem these signatures are not
TFV>> well-suitable (at least not better than three-arg iterate).

TFV>> Using Consumer<IntConsumer> advancer is somewhat better:

TFV>> public static IntStream collatzConsumer(int start) {
TFV>>   int[] cur = {-1};
TFV>>   return produce(action -> {
TFV>>     if(cur[0] == -1) action.accept(cur[0] = start);
TFV>>     if(cur[0] != 1) action.accept((cur[0] = cur[0] % 2 == 0 ? cur[0] / 2 : cur[0] * 3 + 1));
TFV>>   });
TFV>> }

TFV>> However the implementation of such producer is not very good
TFV>> performance-wise as in tryAdvance we need to buffer emitted values.
TFV>> The forEachRemaining performs much better though.

TFV>> In general I like Supplier<OptionalInt> version better (it also used
TFV>> in your Scanner example), though we need to check how much overhead it
TFV>> creates. We cannot rely on JIT inlining here as functional interfaces
TFV>> are really ubiquitous.

TFV>> Having continuations in Java would actually solve this problem, but I
TFV>> guess we will unlikely to see them in the near future.

TFV>> With best regards,
TFV>> Tagir Valeev.

SM>>> Finally getting back to this.

SM>>> So, now that this has been clarified to model a for-loop, this seems fine. It
SM>>> looks like Paul is sponsoring this for you. Great, let's move ahead with it.

SM>>> As the author of this particular RFE, though, I should say that this isn't what
SM>>> I had in mind when I wrote the RFE. :-)

SM>>> There are two main issues I'm concerned about:

SM>>> 1) modeling loops that aren't for-loops; and

SM>>> 2) modeling loops where the termination condition is known to the stream source
SM>>> but isn't part of the stream value.

SM>>> If covering these requires a different API and RFE, then that's fine. I just
SM>>> want to make sure these cases aren't dropped.

SM>>> For case (1) I think the Collatz example is a good one, since you basically had
SM>>> to cheat in order to get it to work with this API, either by appending 1, or by
SM>>> introducing a sentinel value. It's not always possible to do this.

SM>>> The kind of loops I'm interested in look like this:

SM>>>      do {
SM>>>          emitValue();
SM>>>      } while (condition);

SM>>> or

SM>>>      while (true) {
SM>>>          emitValue();
SM>>>          if (condition)
SM>>>              break;
SM>>>          stuff();
SM>>>      }

SM>>> Clearly it's possible to rearrange these to fit into the 3-arg iterate() method,
SM>>> but it's unnatural to do so. For example, to implement the craps dice game, the
SM>>> obvious Java approach is to maintain the game state in an object or even local
SM>>> variables that are mutated as the game progresses. To use the 3-arg iterate()
SM>>> method, you wrote a State object (a value), which knows how to generate its
SM>>> successor State, and from which the int values are extracted. This is a very
SM>>> nice functional approach, but I expect that many people will have data sources
SM>>> that rely on mutation or external resources. It doesn't seem reasonable to
SM>>> require rewriting the source logic in order to produce a stream from it. And in
SM>>> general it might be arbitrarily difficult.

SM>>> It's probably easier to wrap a Spliterator around the source logic and simply
SM>>> call it from tryAdvance(). In fact, this is what I'd like to see in a 
SM>>> stream-source API, to make it easier to create such stream sources without
SM>>> having to subclass AbstractSpliterator.

SM>>> For (2) it's critical that the stream source have control over production of
SM>>> values. Placing takeWhile() / takeWhileInclusive() downstream doesn't work,
SM>>> since the stream machinery (particularly in parallel) may aggressively pull
SM>>> elements from the source before they're presented to takeWhile().

SM>>> For example, consider parsing int values from a Scanner into an IntStream, until
SM>>> there are no more, leaving the scanner at a known state. Suppose there were an
SM>>> iterate(Supplier<OptionalInt>) overload. Then something like this would work:

SM>>>      Scanner sc = new Scanner("1 2 3 4 foo");
SM>>>      IntStream.iterate(
SM>>>              () -> sc.hasNextInt() ? OptionalInt.of(sc.nextInt())
SM>>>                                    : OptionalInt.empty())
SM>>>          .forEach(System.out::println);

SM>>>      assert sc.hasNext();
SM>>>      System.out.println(sc.next()); // prints "foo"

SM>>> Boxing things into optionals is a bit cumbersome, and I'm also concerned about
SM>>> the overhead. But it illustrates the point that the termination condition is
SM>>> separate from the values that go into the stream. In particular there's no
SM>>> "seed" value to which a termination condition can be applied. (And "iterate"
SM>>> might not be the best name for this method, either.)

SM>>> Oh well, I think this is turning into another RFE.

SM>>> s'marks




SM>>> On 2/16/16 9:48 PM, Tagir F. Valeev wrote:
>>>> Hello, Stuart!
>>>>
>>>> Thank you for your comments.
>>>>
>>>> SM> I'd suggest focusing on the API first before worrying about how to track the
>>>> SM> stream state with booleans, etc. Is the API convenient to use, and how well does
>>>> SM> it support the use cases we envision for it?
>>>>
>>>> As Brian already noted, the most benefit of such signature is the
>>>> resemblance to the good old for loop. Also it's good, because the
>>>> lambdas don't need to maintain external mutable state in this case
>>>> (the state is encapsulated in the current element). Most of your
>>>> proposed examples, however, need to do it as they don't receive the
>>>> existing state. Also I see no reason to create a method which is
>>>> compatible with iterator::hasNext/iterator::next or even
>>>> spliterator::tryAdvance. If you already have a spliterator, you can
>>>> create a stream using StreamSupport.stream(spliterator, false). If you
>>>> have an iterator, you can convert it to spliterator using
>>>> Spliterators.spliterator[UnknownSize]. Well, this probably looks ugly,
>>>> but more flexible and signals that some low-level stuff is performed.
>>>>
>>>> Supplier<Stream<T>> is definitely bad in terms of performance.
>>>> Creating a new stream is not nearly free. To illustrate this I wrote a
>>>> simple benchmark which compares .flatMapToInt(OptionalInt::stream)
>>>> with Java8 way .filter(OptionalInt::isPresent).mapToInt(OptionalInt::getAsInt)
>>>>
>>>> Here's source code and full output:
>>>> http://cr.openjdk.java.net/~tvaleev/jmh/optionalstream/
>>>>
>>>> Benchmark                               (n)  Mode  Cnt      Score     Error  Units
>>>> OptionalTest.testOptionalFilterGet       10  avgt   30      0,171 ±   0,011  us/op
>>>> OptionalTest.testOptionalFilterGet     1000  avgt   30      6,295 ±   0,046  us/op
>>>> OptionalTest.testOptionalFilterGet  1000000  avgt   30  12597,706 ±  69,214  us/op
>>>> OptionalTest.testOptionalStream          10  avgt   30      0,330 ±   0,002  us/op
>>>> OptionalTest.testOptionalStream        1000  avgt   30     27,552 ±   0,577  us/op
>>>> OptionalTest.testOptionalStream     1000000  avgt   30  30837,240 ± 812,420  us/op
>>>>
>>>> Involving intermediate streams makes the thing at least twice slower.
>>>> Surely this delay could become negligible in some scenarios, but I
>>>> think it's inacceptable to enforce users to create new source with a
>>>> bunch of streams. At least primitive specializations will become
>>>> meaningless in this case: boxing would eat much less time compared to
>>>> stream creation.
>>>>
>>>> As for elements drawn from the queue, it's much better to use existing
>>>> takeWhile method:
>>>>
>>>> queue.stream().takeWhile(x -> x.equals(sentinel));
>>>>
>>>> True, such approach will not include the sentinel element to the
>>>> result, and there's no easy way to do it with current API. Probably
>>>> additional method (takeWhileInclusive?) could be considered to solve
>>>> such problems. Still, I think, drawing from the queue is not the
>>>> problem which should be solved with new iterate() method.
>>>>
>>>> As for Collatz conjecture, it's quite easy to iterate without trailing
>>>> one:
>>>>
>>>> IntStream.iterate(start, val -> val != 1,
>>>>    val -> val % 2 == 0 ? val / 2 : val * 3 + 1)
>>>>
>>>> If having one is desired, then it would be easier just to append one
>>>> to the stream (even if Collatz conjecture is false we will have an
>>>> infinite stream, so appended one will never appear):
>>>>
>>>> IntStream.concat(
>>>>    IntStream.iterate(start, val -> val != 1,
>>>>        val -> val % 2 == 0 ? val / 2 : val * 3 + 1),
>>>>    IntStream.of(1))
>>>>
>>>> A side note: having IntStream.append(int... numbers) would be really
>>>> nice:
>>>>
>>>> IntStream.iterate(start, val -> val != 1,
>>>>    val -> val % 2 == 0 ? val / 2 : val * 3 + 1).append(1)
>>>>
>>>> Another approach would be to introduce a special stop value (for
>>>> example, -1):
>>>>
>>>> IntStream.iterate(start, val -> val != -1,
>>>>    val -> val == 1 ? -1 : val % 2 == 0 ? val / 2 : val * 3 + 1)
>>>>
>>>> This stream produces Collatz series, including the trailing one.
>>>>
>>>> As for Craps, I never heard about such game. If I understood the rules
>>>> correctly, it's good to represent the state as separate object and
>>>> define state transition via its method. Something like this should
>>>> work:
>>>>
>>>> Random r = new Random();
>>>> IntSupplier dice = () -> r.nextInt(6)+r.nextInt(6)+2;
>>>> class State {
>>>>    int roll, point;
>>>>
>>>>    State(int roll, int point) {
>>>>      this.roll = roll;
>>>>      this.point = point;
>>>>    }
>>>>
>>>>    State() {
>>>>      this(dice.getAsInt(), 0);
>>>>    }
>>>>
>>>>    boolean isStopRound() {
>>>>      return roll == 7 || (point == 0 && (roll > 10 || roll < 4)) || (point != 0 && roll == point);
>>>>    }
>>>>
>>>>    State next() {
>>>>      return isStopRound() ? null : new State(dice.getAsInt(), point == 0 ? roll : point);
>>>>    }
>>>> }
>>>> Stream.iterate(new State(), Objects::nonNull, State::next)
>>>>        .mapToInt(state -> state.roll)
>>>>        .forEach(System.out::println);
>>>>
>>>> With best regards,
>>>> Tagir Valeev.
>>>>
>>>>
>>>> SM> In particular, I can imagine a number of cases where it would be very helpful to
>>>> SM> be able to support an empty stream, or where the computation to produce the
>>>> SM> first element is the same as the computation to produce subsequent elements.
>>>> SM> Requiring a value for the first stream element is at odds with that.
>>>>
>>>> SM> Here are some ideas for use cases to try out:
>>>>
>>>> SM>   - a series of dice rolls representing a round of craps [1]
>>>> SM>   - elements drawn from a queue until the queue is empty or until
>>>> SM>     a sentinel is reached
>>>> SM>   - a sequence of numbers that (probably) terminates but whose length
>>>> SM>     isn't necessarily known in advance (e.g. Collatz sequence [2])
>>>>
>>>> SM> [1] https://en.wikipedia.org/wiki/Craps
>>>>
>>>> SM> [2] https://en.wikipedia.org/wiki/Collatz_conjecture
>>>>
>>>> SM> Note that in some cases the sentinel value that terminates the stream should be
>>>> SM> part of the stream, and in other cases it's not.
>>>>
>>>> SM> I'm sure you can find more uses cases by perusing Stack Overflow. :-)
>>>>
>>>> SM> I'm a bit skeptical of the use of "iterate" for producing a finite stream. There
>>>> SM> are the usual issues with overloading, but there's also potential confusion as
>>>> SM> some forms of iterate() are infinite and others finite. I'll suggest the name
>>>> SM> "produce" instead, but there are surely better terms.
>>>>
>>>> SM> One thing to think about is where the state of the producer is stored. Is it
>>>> SM> expected to be in an argument that's passed to each invocation of the functional
>>>> SM> argument, or is it expected to be captured? I don't think there's an answer in
>>>> SM> isolation; examining use cases would probably shed some light here.
>>>>
>>>> SM> Here are a few API ideas (wildcards elided):
>>>>
>>>> SM> --
>>>>
>>>> SM> <T> Stream<T> iterate(T seed, Predicate<T> predicate, UnaryOperator<T> f)
>>>>
>>>> SM> The API from your proposal, for comparison purposes.
>>>>
>>>> SM> --
>>>>
>>>> SM> <T> Stream<T> produce(Supplier<Optional<T>>)
>>>>
>>>> SM> Produces elements until empty Optional is returned. This box/unboxes every
>>>> SM> element, maybe(?) alleviated by Valhalla.
>>>>
>>>> SM> --
>>>>
>>>> SM> <T> Stream<T> produce(BooleanSupplier, Supplier<T>)
>>>>
>>>> SM> Calls the BooleanSupplier; if true the next stream element is what's returned by
>>>> SM> calling the Supplier. If BooleanSupplier returns false, end of stream. If you
>>>> SM> have an iterator already, this enables
>>>>
>>>> SM> produce(iterator::hasNext, iterator::next)
>>>>
>>>> SM> But if you don't have an iterator already, coming up with the functions to
>>>> SM> satisfy the iterator-style protocol is sometimes painful.
>>>>
>>>> SM> --
>>>>
>>>> SM> <T> Stream<T> produce(Predicate<Consumer<T>> advancer)
>>>>
>>>> SM> This has an odd signature, but the function is like Spliterator.tryAdvance(). It
>>>> SM> must either call the consumer once and return true, or return false without
>>>> SM> calling the consumer.
>>>>
>>>> SM> --
>>>>
>>>> SM> <T> Stream<T> produce(Consumer<Consumer<T>> advancer)
>>>>
>>>> SM> A variation of the above, without a boolean return. The advancer calls the
>>>> SM> consumer one or more times to add elements to the stream. End of stream occurs
>>>> SM> when the advancer doesn't call the consumer.
>>>>
>>>> SM> --
>>>>
>>>> SM> <T> Stream<T> produce(Supplier<Stream<T>>)
>>>>
>>>> SM> A variation of Supplier<Optional<T>> where the supplier returns a stream
>>>> SM> containing zero or more elements. The stream terminates if the supplier returns
>>>> SM> an empty stream. There "boxing" overhead here, but we don't seem to be bothered
>>>> SM> by this with flatMap().
>>>>
>>>> SM> --
>>>>
>>>> SM> s'marks
>>>>
>>>>
>>>> SM> On 2/14/16 6:53 AM, Tagir F. Valeev wrote:
>>>>>> Hello!
>>>>>>
>>>>>> I wanted to work on foldLeft, but Brian asked me to take this issue
>>>>>> instead. So here's webrev:
>>>>>> http://cr.openjdk.java.net/~tvaleev/webrev/8072727/r1/
>>>>>>
>>>>>> I don't like iterator-based Stream source implementations, so I made
>>>>>> them AbstractSpliterator-based. I also implemented manually
>>>>>> forEachRemaining as, I believe, this improves the performance in
>>>>>> non-short-circuiting cases.
>>>>>>
>>>>>> I also decided to keep two flags (started and finished) to track the
>>>>>> state. Currently existing implementation of infinite iterate() does
>>>>>> not use started flag, but instead reads one element ahead for
>>>>>> primitive streams. This seems wrong to me and may even lead to
>>>>>> unexpected exceptions (*). I could get rid of "started" flag for
>>>>>> Stream.iterate() using Streams.NONE, but this would make object
>>>>>> implementation different from primitive implementations. It would also
>>>>>> be possible to keep single three-state variable (byte or int,
>>>>>> NOT_STARTED, STARTED, FINISHED), but I doubt that this would improve
>>>>>> the performance or footprint. Having two flags looks more readable to
>>>>>> me.
>>>>>>
>>>>>> Currently existing two-arg iterate methods can now be expressed as a
>>>>>> partial case of the new method:
>>>>>>
>>>>>> public static<T> Stream<T> iterate(final T seed, final UnaryOperator<T> f) {
>>>>>>       return iterate(seed, x -> true, f);
>>>>>> }
>>>>>> (same for primitive streams). I may do this if you think it's
>>>>>> reasonable.
>>>>>>
>>>>>> I created new test class and added new iterate sources to existing
>>>>>> data providers.
>>>>>>
>>>>>> Please review and sponsor!
>>>>>>
>>>>>> With best regards,
>>>>>> Tagir Valeev.
>>>>>>
>>>>>> (*) Consider the following code:
>>>>>>
>>>>>> int[] data = {1,2,3,4,-1};
>>>>>> IntStream.iterate(0, x -> data[x])
>>>>>>            .takeWhile(x -> x >= 0)
>>>>>>            .forEach(System.out::println);
>>>>>>
>>>>>> Currently this unexpectedly throws an AIOOBE, because
>>>>>> IntStream.iterate unnecessarily tries to read one element ahead.
>>>>>>
>>>>




More information about the core-libs-dev mailing list