<T> Stream.fromForEach(Consumer<Consumer<T>>)

Brian Goetz brian.goetz at oracle.com
Mon May 23 12:53:27 UTC 2022


This is a nice use of `mapMulti` to create a stream whose contents are 
dynamically generated.  It is one step short of generators (which we 
hope Loom will eventually give us), in that it is restricted to a 
generator method that generates all of the elements in one invocation.  
This is still quite expressive.  (In hindsight, the name `generate` was 
wasted on the current `Stream::generate`, which is not all that useful 
and taking up a good name.)

I agree that the naming needs work, and people may well need help 
navigating the type `Consumer<Consumer<T>>`. Push is evocative, but 
there's more than just push; it's more like you have to kick something 
into pushing.  (Perhaps it is like the mythical Pushmi-Pullyu beast.)

Ideally, what is captured in the naming is that the outer `consumer` 
method is called exactly once, and that it should call the inner 
consumer to push elements.  (Of course that's a lot to capture.)  
Something that evokes "build by push", perhaps.  Stream::fromPush is 
about the best I've got right now.

Are there any useful Consumer<Consumer<T>> in the wild, that are 
actually typed like that?  I doubt it (there are many things convertible 
to it, though.)  Which suggests it might be fruitful to define a FI:

     interface PushSource<T> {
         void accept(Consumer<? extends Consumer<T>> pusher);
     }

     static<T> Stream<T> fromPush(PushSource<T> source) { ... }

and Iterable::forEachRemaining and Optional::ifPresent will convert to it.


On 5/21/2022 6:54 PM, Remi Forax wrote:
> Hi all,
> a stream is kind of push iterator so it can be created from any object that has a method like forEach(Consumer),
> but sadly there is no static method to create a Stream from a Consumer of Consumer so people usually miss that creating a Stream from events pushed to a consumer is easy.
>
> By example, let say i've a very simple parser like this, it calls the listener/callback for each tokens
>
>    record Parser(String text) {
>      void parse(Consumer<? super String> listener) {
>        for(var token: text.split(" ")) {
>           listener.accept(token);
>        }
>      }
>    }
>
> Using the method Stream.fromForEach, we can create a Stream from the sequence of tokens that are pushed through the listener
>    var parser = new Parser("1 2");
>    var stream = Stream.fromForEach(parser::parse);
>
> It can also works with an iterable, an optional or even a collection
>    Iterable<String> iterable = ...
>    var stream = Stream.fromForEach(iterable::forEachRemaning);
>
>    Optional<String> optional = ...
>    var stream = Stream.fromForEach(optional::ifPresent);
>
>    List<String> list = ...
>    var stream = Stream.fromForEach(list::forEach);
>
> I known the last two examples already have their own method stream(), it's just to explain how Stream.fromForEach is supposed to work.
>
> In term of implementation, Stream.fromForEach() is equivalent to creating a stream using a mapMulti(), so it can be implemented like this
>
>    static <T> Stream<T> fromForEach(Consumer<? super Consumer<T>> forEach) {
>      return Stream.of((T) null).mapMulti((__, consumer) -> forEach.accept(consumer));
>    }
>
> but i think that Stream.fromForEach(iterable::forEachRemaning) is more readable than Stream.of(iterable).mapMult(Iterable::forEachRemaining).
>
> The name fromForEach is not great and i'm sure someone will come with a better one.
>
> regards,
> Rémi


More information about the core-libs-dev mailing list