Forked streams
Paul Sandoz
paul.sandoz at oracle.com
Fri Nov 29 05:49:31 PST 2013
On Nov 29, 2013, at 1:41 PM, Richard Warburton <richard.warburton at gmail.com> wrote:
> tl;dr whilst the core Java streams library may not be the right place to support forkable streams, you can use RxJava to get what you want.
>
Yes, i am glad you brought up RxJava, it's a great example of an alternative streamy API with a different model.
> Another thing to mention on the front of forkable streams is the RxJava project which brings reactive programming to the JVM. [0] They have a concept called an Observable, that in many ways acts like stream. The underlying threading model is quite different though, I won't go into details but if you're interested in project lambda, then you should also take a look at RxJava.
>
Indeed, there is some sophisticated stuff going on underneath, as well as a rich set operations on Observable.
> They have a way of connecting observables [1] which means elements in one observable can be multicasted to other observables. The differences in the underlying implementation between streams and observables mean that while the idea of multicasting values from one stream into another brings a series of complications that Paul has outlined, they don't really exist in RxJava. Which is why I suggest that if you want that feature, maybe RxJava is the library that you really should be using rather than Streams.
>
Right, pick the API/framework to match the programming model.
Even if the Stream API evolves to support some style of forking [1] or cancellation it is unlikely to ever evolve to support the same model as RxJava.
It would be interesting to explore how Stream and RxJava could be hooked up, or even how RxJava could take advantage of Spliterator.
Paul.
[1] e.g. via an SPI
Stream<String> s = ...
ForkingStream<String> f = s.op(forking()); // terminal op
CompletableFuture<Integer> r = f.pitch(
s -> s.mapToInt(...).sum());
CompletableFuture<Map<Integer, Long>> r = f.pitch(
s -> s.collect(Collectors.groupingBy(String::length, Collectors.counting())));
CompletableFuture<Void> all = f.fork();
More information about the lambda-dev
mailing list