RFR: 8072727 - add variation of Stream.iterate() that's finite
Tagir F. Valeev
amaembo at gmail.com
Fri Mar 11 08:27:19 UTC 2016
Using the IntEmitter the Craps round could also be implemented easily
and without additional classes:
public static IntEmitter crapsEmitter(IntSupplier dice, int point) {
return action -> {
int roll = dice.getAsInt();
action.accept(roll);
return roll == 7 || (point == 0 && (roll > 10 || roll < 4)) || (point != 0 && roll == point)
? null : crapsEmitter(dice, point == 0 ? roll : point);
};
}
Usage:
Random r = new Random();
IntSupplier dice = () -> r.nextInt(6) + r.nextInt(6) + 2;
crapsEmitter(dice, 0).stream().forEach(System.out::println);
TFV> Here's the source code I wrote for this test:
TFV> http://cr.openjdk.java.net/~tvaleev/patches/produce/ProduceTest.java
TFV> I also have somewhat weird idea of emitters (added to this test class
TFV> as well). Here's example:
TFV> public static IntEmitter collatzEmitter(int val) {
TFV> return action -> {
TFV> action.accept(val);
TFV> return val == 1 ? null : collatzEmitter(val % 2 == 0 ? val / 2 : val * 3 + 1);
TFV> };
TFV> }
TFV> Usage:
TFV> collatzEmitter(17).stream().forEach(System.out::println);
TFV> It does not need any special value as well as special handling of
TFV> starting value. No additional code for finishing is also necessary.
TFV> However such solution surely produces garbage (at least one lambda
TFV> instance per iteration), so the performance would degrade. However I
TFV> like it.
TFV> With best regards,
TFV> Tagir Valeev.
TFV>> Hello!
TFV>> Just for your information I implemented two alternative stream
TFV>> producers as you suggested:
TFV>> // Produces stream of values returned by producer until the first
TFV>> // empty optional is produced
TFV>> public static IntStream produce(Supplier<OptionalInt> producer);
TFV>> // Produces stream of values until predicate returns false
TFV>> // iff predicate returns true it must call the supplied IntConsumer
TFV>> // exactly once (like in Spliterator.tryAdvance)
TFV>> public static IntStream produce(Predicate<IntConsumer> producer);
TFV>> // Produces stream of values until advancer doesn't call the
TFV>> // IntConsumer at least once.
TFV>> // Calling the IntConsumer multiple times is also acceptable.
TFV>> public static IntStream produce(Consumer<IntConsumer> advancer);
TFV>> I tried to produce the Collatz sequence starting from given number and
TFV>> including the trailing one. Unfortunately I don't see how such
TFV>> signatures might simplify the thing. Here's three-arg iterate
TFV>> implementation posted by me before, for reference:
TFV>> IntStream.iterate(start, val -> val != -1,
TFV>> val -> val == 1 ? -1 : val % 2 == 0 ? val / 2 : val * 3 + 1);
TFV>> Here's plain-old-loop implementation (without stream):
TFV>> public static void collatzLoop(int start) {
TFV>> int val = start;
TFV>> System.out.println(val);
TFV>> while(val != 1) {
TFV>> val = val % 2 == 0 ? val / 2 : val * 3 + 1;
TFV>> System.out.println(val);
TFV>> }
TFV>> }
TFV>> The problem here is that in the easiest implementation we need to emit
TFV>> the value at two places. Otherwise we either have problems with the
TFV>> last value or with the first value. Here's Supplier<OptionalInt>
TFV>> implementation:
TFV>> public static IntStream collatzSupplier(int start) {
TFV>> int[] cur = {-1};
TFV>> return produce(() ->
TFV>> cur[0] == 1 ? OptionalInt.empty()
TFV>> : OptionalInt.of(cur[0] =
TFV>> (cur[0] == -1 ? start :
TFV>> cur[0] % 2 == 0 ?
TFV>> cur[0] / 2 : cur[0] * 3 + 1)));
TFV>> }
TFV>> Of course we have to maintain the shared state (here I used dirty
TFV>> one-element array trick). Here we have the same three conditions: to
TFV>> detect starting case, to detect finishing case and to separate
TFV>> even/odd cases. Also we also need some special value. Predicate-based
TFV>> solution is not simpler and has the same three conditions:
TFV>> public static IntStream collatzPredicate(int start) {
TFV>> int[] cur = {-1};
TFV>> return produce(action -> {
TFV>> if(cur[0] == 1) return false;
TFV>> action.accept(cur[0] = (
TFV>> cur[0] == -1 ? start :
TFV>> cur[0] % 2 == 0 ? cur[0] / 2 : cur[0] * 3 + 1));
TFV>> return true;
TFV>> });
TFV>> }
TFV>> So, at least for Collatz problem these signatures are not
TFV>> well-suitable (at least not better than three-arg iterate).
TFV>> Using Consumer<IntConsumer> advancer is somewhat better:
TFV>> public static IntStream collatzConsumer(int start) {
TFV>> int[] cur = {-1};
TFV>> return produce(action -> {
TFV>> if(cur[0] == -1) action.accept(cur[0] = start);
TFV>> if(cur[0] != 1) action.accept((cur[0] = cur[0] % 2 == 0 ? cur[0] / 2 : cur[0] * 3 + 1));
TFV>> });
TFV>> }
TFV>> However the implementation of such producer is not very good
TFV>> performance-wise as in tryAdvance we need to buffer emitted values.
TFV>> The forEachRemaining performs much better though.
TFV>> In general I like Supplier<OptionalInt> version better (it also used
TFV>> in your Scanner example), though we need to check how much overhead it
TFV>> creates. We cannot rely on JIT inlining here as functional interfaces
TFV>> are really ubiquitous.
TFV>> Having continuations in Java would actually solve this problem, but I
TFV>> guess we will unlikely to see them in the near future.
TFV>> With best regards,
TFV>> Tagir Valeev.
SM>>> Finally getting back to this.
SM>>> So, now that this has been clarified to model a for-loop, this seems fine. It
SM>>> looks like Paul is sponsoring this for you. Great, let's move ahead with it.
SM>>> As the author of this particular RFE, though, I should say that this isn't what
SM>>> I had in mind when I wrote the RFE. :-)
SM>>> There are two main issues I'm concerned about:
SM>>> 1) modeling loops that aren't for-loops; and
SM>>> 2) modeling loops where the termination condition is known to the stream source
SM>>> but isn't part of the stream value.
SM>>> If covering these requires a different API and RFE, then that's fine. I just
SM>>> want to make sure these cases aren't dropped.
SM>>> For case (1) I think the Collatz example is a good one, since you basically had
SM>>> to cheat in order to get it to work with this API, either by appending 1, or by
SM>>> introducing a sentinel value. It's not always possible to do this.
SM>>> The kind of loops I'm interested in look like this:
SM>>> do {
SM>>> emitValue();
SM>>> } while (condition);
SM>>> or
SM>>> while (true) {
SM>>> emitValue();
SM>>> if (condition)
SM>>> break;
SM>>> stuff();
SM>>> }
SM>>> Clearly it's possible to rearrange these to fit into the 3-arg iterate() method,
SM>>> but it's unnatural to do so. For example, to implement the craps dice game, the
SM>>> obvious Java approach is to maintain the game state in an object or even local
SM>>> variables that are mutated as the game progresses. To use the 3-arg iterate()
SM>>> method, you wrote a State object (a value), which knows how to generate its
SM>>> successor State, and from which the int values are extracted. This is a very
SM>>> nice functional approach, but I expect that many people will have data sources
SM>>> that rely on mutation or external resources. It doesn't seem reasonable to
SM>>> require rewriting the source logic in order to produce a stream from it. And in
SM>>> general it might be arbitrarily difficult.
SM>>> It's probably easier to wrap a Spliterator around the source logic and simply
SM>>> call it from tryAdvance(). In fact, this is what I'd like to see in a
SM>>> stream-source API, to make it easier to create such stream sources without
SM>>> having to subclass AbstractSpliterator.
SM>>> For (2) it's critical that the stream source have control over production of
SM>>> values. Placing takeWhile() / takeWhileInclusive() downstream doesn't work,
SM>>> since the stream machinery (particularly in parallel) may aggressively pull
SM>>> elements from the source before they're presented to takeWhile().
SM>>> For example, consider parsing int values from a Scanner into an IntStream, until
SM>>> there are no more, leaving the scanner at a known state. Suppose there were an
SM>>> iterate(Supplier<OptionalInt>) overload. Then something like this would work:
SM>>> Scanner sc = new Scanner("1 2 3 4 foo");
SM>>> IntStream.iterate(
SM>>> () -> sc.hasNextInt() ? OptionalInt.of(sc.nextInt())
SM>>> : OptionalInt.empty())
SM>>> .forEach(System.out::println);
SM>>> assert sc.hasNext();
SM>>> System.out.println(sc.next()); // prints "foo"
SM>>> Boxing things into optionals is a bit cumbersome, and I'm also concerned about
SM>>> the overhead. But it illustrates the point that the termination condition is
SM>>> separate from the values that go into the stream. In particular there's no
SM>>> "seed" value to which a termination condition can be applied. (And "iterate"
SM>>> might not be the best name for this method, either.)
SM>>> Oh well, I think this is turning into another RFE.
SM>>> s'marks
SM>>> On 2/16/16 9:48 PM, Tagir F. Valeev wrote:
>>>> Hello, Stuart!
>>>>
>>>> Thank you for your comments.
>>>>
>>>> SM> I'd suggest focusing on the API first before worrying about how to track the
>>>> SM> stream state with booleans, etc. Is the API convenient to use, and how well does
>>>> SM> it support the use cases we envision for it?
>>>>
>>>> As Brian already noted, the most benefit of such signature is the
>>>> resemblance to the good old for loop. Also it's good, because the
>>>> lambdas don't need to maintain external mutable state in this case
>>>> (the state is encapsulated in the current element). Most of your
>>>> proposed examples, however, need to do it as they don't receive the
>>>> existing state. Also I see no reason to create a method which is
>>>> compatible with iterator::hasNext/iterator::next or even
>>>> spliterator::tryAdvance. If you already have a spliterator, you can
>>>> create a stream using StreamSupport.stream(spliterator, false). If you
>>>> have an iterator, you can convert it to spliterator using
>>>> Spliterators.spliterator[UnknownSize]. Well, this probably looks ugly,
>>>> but more flexible and signals that some low-level stuff is performed.
>>>>
>>>> Supplier<Stream<T>> is definitely bad in terms of performance.
>>>> Creating a new stream is not nearly free. To illustrate this I wrote a
>>>> simple benchmark which compares .flatMapToInt(OptionalInt::stream)
>>>> with Java8 way .filter(OptionalInt::isPresent).mapToInt(OptionalInt::getAsInt)
>>>>
>>>> Here's source code and full output:
>>>> http://cr.openjdk.java.net/~tvaleev/jmh/optionalstream/
>>>>
>>>> Benchmark (n) Mode Cnt Score Error Units
>>>> OptionalTest.testOptionalFilterGet 10 avgt 30 0,171 ± 0,011 us/op
>>>> OptionalTest.testOptionalFilterGet 1000 avgt 30 6,295 ± 0,046 us/op
>>>> OptionalTest.testOptionalFilterGet 1000000 avgt 30 12597,706 ± 69,214 us/op
>>>> OptionalTest.testOptionalStream 10 avgt 30 0,330 ± 0,002 us/op
>>>> OptionalTest.testOptionalStream 1000 avgt 30 27,552 ± 0,577 us/op
>>>> OptionalTest.testOptionalStream 1000000 avgt 30 30837,240 ± 812,420 us/op
>>>>
>>>> Involving intermediate streams makes the thing at least twice slower.
>>>> Surely this delay could become negligible in some scenarios, but I
>>>> think it's inacceptable to enforce users to create new source with a
>>>> bunch of streams. At least primitive specializations will become
>>>> meaningless in this case: boxing would eat much less time compared to
>>>> stream creation.
>>>>
>>>> As for elements drawn from the queue, it's much better to use existing
>>>> takeWhile method:
>>>>
>>>> queue.stream().takeWhile(x -> x.equals(sentinel));
>>>>
>>>> True, such approach will not include the sentinel element to the
>>>> result, and there's no easy way to do it with current API. Probably
>>>> additional method (takeWhileInclusive?) could be considered to solve
>>>> such problems. Still, I think, drawing from the queue is not the
>>>> problem which should be solved with new iterate() method.
>>>>
>>>> As for Collatz conjecture, it's quite easy to iterate without trailing
>>>> one:
>>>>
>>>> IntStream.iterate(start, val -> val != 1,
>>>> val -> val % 2 == 0 ? val / 2 : val * 3 + 1)
>>>>
>>>> If having one is desired, then it would be easier just to append one
>>>> to the stream (even if Collatz conjecture is false we will have an
>>>> infinite stream, so appended one will never appear):
>>>>
>>>> IntStream.concat(
>>>> IntStream.iterate(start, val -> val != 1,
>>>> val -> val % 2 == 0 ? val / 2 : val * 3 + 1),
>>>> IntStream.of(1))
>>>>
>>>> A side note: having IntStream.append(int... numbers) would be really
>>>> nice:
>>>>
>>>> IntStream.iterate(start, val -> val != 1,
>>>> val -> val % 2 == 0 ? val / 2 : val * 3 + 1).append(1)
>>>>
>>>> Another approach would be to introduce a special stop value (for
>>>> example, -1):
>>>>
>>>> IntStream.iterate(start, val -> val != -1,
>>>> val -> val == 1 ? -1 : val % 2 == 0 ? val / 2 : val * 3 + 1)
>>>>
>>>> This stream produces Collatz series, including the trailing one.
>>>>
>>>> As for Craps, I never heard about such game. If I understood the rules
>>>> correctly, it's good to represent the state as separate object and
>>>> define state transition via its method. Something like this should
>>>> work:
>>>>
>>>> Random r = new Random();
>>>> IntSupplier dice = () -> r.nextInt(6)+r.nextInt(6)+2;
>>>> class State {
>>>> int roll, point;
>>>>
>>>> State(int roll, int point) {
>>>> this.roll = roll;
>>>> this.point = point;
>>>> }
>>>>
>>>> State() {
>>>> this(dice.getAsInt(), 0);
>>>> }
>>>>
>>>> boolean isStopRound() {
>>>> return roll == 7 || (point == 0 && (roll > 10 || roll < 4)) || (point != 0 && roll == point);
>>>> }
>>>>
>>>> State next() {
>>>> return isStopRound() ? null : new State(dice.getAsInt(), point == 0 ? roll : point);
>>>> }
>>>> }
>>>> Stream.iterate(new State(), Objects::nonNull, State::next)
>>>> .mapToInt(state -> state.roll)
>>>> .forEach(System.out::println);
>>>>
>>>> With best regards,
>>>> Tagir Valeev.
>>>>
>>>>
>>>> SM> In particular, I can imagine a number of cases where it would be very helpful to
>>>> SM> be able to support an empty stream, or where the computation to produce the
>>>> SM> first element is the same as the computation to produce subsequent elements.
>>>> SM> Requiring a value for the first stream element is at odds with that.
>>>>
>>>> SM> Here are some ideas for use cases to try out:
>>>>
>>>> SM> - a series of dice rolls representing a round of craps [1]
>>>> SM> - elements drawn from a queue until the queue is empty or until
>>>> SM> a sentinel is reached
>>>> SM> - a sequence of numbers that (probably) terminates but whose length
>>>> SM> isn't necessarily known in advance (e.g. Collatz sequence [2])
>>>>
>>>> SM> [1] https://en.wikipedia.org/wiki/Craps
>>>>
>>>> SM> [2] https://en.wikipedia.org/wiki/Collatz_conjecture
>>>>
>>>> SM> Note that in some cases the sentinel value that terminates the stream should be
>>>> SM> part of the stream, and in other cases it's not.
>>>>
>>>> SM> I'm sure you can find more uses cases by perusing Stack Overflow. :-)
>>>>
>>>> SM> I'm a bit skeptical of the use of "iterate" for producing a finite stream. There
>>>> SM> are the usual issues with overloading, but there's also potential confusion as
>>>> SM> some forms of iterate() are infinite and others finite. I'll suggest the name
>>>> SM> "produce" instead, but there are surely better terms.
>>>>
>>>> SM> One thing to think about is where the state of the producer is stored. Is it
>>>> SM> expected to be in an argument that's passed to each invocation of the functional
>>>> SM> argument, or is it expected to be captured? I don't think there's an answer in
>>>> SM> isolation; examining use cases would probably shed some light here.
>>>>
>>>> SM> Here are a few API ideas (wildcards elided):
>>>>
>>>> SM> --
>>>>
>>>> SM> <T> Stream<T> iterate(T seed, Predicate<T> predicate, UnaryOperator<T> f)
>>>>
>>>> SM> The API from your proposal, for comparison purposes.
>>>>
>>>> SM> --
>>>>
>>>> SM> <T> Stream<T> produce(Supplier<Optional<T>>)
>>>>
>>>> SM> Produces elements until empty Optional is returned. This box/unboxes every
>>>> SM> element, maybe(?) alleviated by Valhalla.
>>>>
>>>> SM> --
>>>>
>>>> SM> <T> Stream<T> produce(BooleanSupplier, Supplier<T>)
>>>>
>>>> SM> Calls the BooleanSupplier; if true the next stream element is what's returned by
>>>> SM> calling the Supplier. If BooleanSupplier returns false, end of stream. If you
>>>> SM> have an iterator already, this enables
>>>>
>>>> SM> produce(iterator::hasNext, iterator::next)
>>>>
>>>> SM> But if you don't have an iterator already, coming up with the functions to
>>>> SM> satisfy the iterator-style protocol is sometimes painful.
>>>>
>>>> SM> --
>>>>
>>>> SM> <T> Stream<T> produce(Predicate<Consumer<T>> advancer)
>>>>
>>>> SM> This has an odd signature, but the function is like Spliterator.tryAdvance(). It
>>>> SM> must either call the consumer once and return true, or return false without
>>>> SM> calling the consumer.
>>>>
>>>> SM> --
>>>>
>>>> SM> <T> Stream<T> produce(Consumer<Consumer<T>> advancer)
>>>>
>>>> SM> A variation of the above, without a boolean return. The advancer calls the
>>>> SM> consumer one or more times to add elements to the stream. End of stream occurs
>>>> SM> when the advancer doesn't call the consumer.
>>>>
>>>> SM> --
>>>>
>>>> SM> <T> Stream<T> produce(Supplier<Stream<T>>)
>>>>
>>>> SM> A variation of Supplier<Optional<T>> where the supplier returns a stream
>>>> SM> containing zero or more elements. The stream terminates if the supplier returns
>>>> SM> an empty stream. There "boxing" overhead here, but we don't seem to be bothered
>>>> SM> by this with flatMap().
>>>>
>>>> SM> --
>>>>
>>>> SM> s'marks
>>>>
>>>>
>>>> SM> On 2/14/16 6:53 AM, Tagir F. Valeev wrote:
>>>>>> Hello!
>>>>>>
>>>>>> I wanted to work on foldLeft, but Brian asked me to take this issue
>>>>>> instead. So here's webrev:
>>>>>> http://cr.openjdk.java.net/~tvaleev/webrev/8072727/r1/
>>>>>>
>>>>>> I don't like iterator-based Stream source implementations, so I made
>>>>>> them AbstractSpliterator-based. I also implemented manually
>>>>>> forEachRemaining as, I believe, this improves the performance in
>>>>>> non-short-circuiting cases.
>>>>>>
>>>>>> I also decided to keep two flags (started and finished) to track the
>>>>>> state. Currently existing implementation of infinite iterate() does
>>>>>> not use started flag, but instead reads one element ahead for
>>>>>> primitive streams. This seems wrong to me and may even lead to
>>>>>> unexpected exceptions (*). I could get rid of "started" flag for
>>>>>> Stream.iterate() using Streams.NONE, but this would make object
>>>>>> implementation different from primitive implementations. It would also
>>>>>> be possible to keep single three-state variable (byte or int,
>>>>>> NOT_STARTED, STARTED, FINISHED), but I doubt that this would improve
>>>>>> the performance or footprint. Having two flags looks more readable to
>>>>>> me.
>>>>>>
>>>>>> Currently existing two-arg iterate methods can now be expressed as a
>>>>>> partial case of the new method:
>>>>>>
>>>>>> public static<T> Stream<T> iterate(final T seed, final UnaryOperator<T> f) {
>>>>>> return iterate(seed, x -> true, f);
>>>>>> }
>>>>>> (same for primitive streams). I may do this if you think it's
>>>>>> reasonable.
>>>>>>
>>>>>> I created new test class and added new iterate sources to existing
>>>>>> data providers.
>>>>>>
>>>>>> Please review and sponsor!
>>>>>>
>>>>>> With best regards,
>>>>>> Tagir Valeev.
>>>>>>
>>>>>> (*) Consider the following code:
>>>>>>
>>>>>> int[] data = {1,2,3,4,-1};
>>>>>> IntStream.iterate(0, x -> data[x])
>>>>>> .takeWhile(x -> x >= 0)
>>>>>> .forEach(System.out::println);
>>>>>>
>>>>>> Currently this unexpectedly throws an AIOOBE, because
>>>>>> IntStream.iterate unnecessarily tries to read one element ahead.
>>>>>>
>>>>
More information about the core-libs-dev
mailing list