Reactive Stream Questions

James Roper james at lightbend.com
Wed Aug 1 01:04:06 UTC 2018


On Tue, 31 Jul 2018 at 23:54, Douglas Surber <douglas.surber at oracle.com>
wrote:

> Please do not use this thread to discuss the reactive stream proposal.
> Feel free to discuss the answers to the questions, but not the overall
> proposal.
>
> Q1: Is Flow.Publisher<Void> ok? I assume only onComplete or onError would
> be called.
>

Only onComplete or onError could be called - Void is uninstantiable, so
null is the only valid value for it, but onNext is required to throw NPE if
passed null. I don't see any issues with it for signalling completion or
failure but nothing else, the Void type makes it very explicit that no
elements will ever be passed because none can be.

Q2: Which is better, subscribe(Flow.Processor) or
> subscribe(Flow.Subscriber, Flow.Publisher)?
>     This is to replace RowPublisherOperation.subscribe(Flow.Subscriber,
> CompletionStage).
>

I'd say Flow.Processor, it's trivial to create a processor from a
subscriber and publisher, and in cases where you already have a processor,
it's less boiler plate to pass it once rather than as both parameters.


> Q3: Assume DataSource extends Flow.Publisher<Session>.
>
>     ds.flatMap( s -> {
>       try(Session session = s.attach()) {
>         session.<Employee>rowPublisherOperation(sql) . . .

      }
>     });
>

Correct me if I'm wrong, but this option would require that you block on
the result of the operation, otherwise the session will be closed before
its finished being used. That would defeat the purpose of having an
asynchronous database driver.

    or
>
>     ds.flatMap( session -> {
>       session.<Employee>rowPublisherOperation(sql) . . .
>     }
>
>     The second requires that session only be valid for the duration of the
> call to onNext.
>
    The point is that a Session still must be attached and closed. Other
> alternatives?
>

Who/what is providing the flatMap method? The thing about onNext is that
most reactive streams implementations immediately dispatch it to another
thread pool. So onNext will typically return immediately after that
dispatch. onNext (or any other methods on Subscriber) should not be viewed
as callbacks or as units of processing, they should just be viewed as
signals. So, a publisher signals an element by invoking onNext. How and
when that element is processed is out of scope and the publisher must not
make any assumptions about it, it could be processed synchronously in the
onNext method, but chances are, it will be processed asynchronously at a
different time by a different thread.

All that said though, there's no need in the above to make any assumptions
about onNext, because you're using flatMap. flatMap requires the user to
return a Publisher. The DataSource can know that the handler has finished
processing the element by subscribing to that publisher and listening for
its onComplete signal. In the mean time, the invocation of onNext will have
been a thing way in the past, the handler of the session will have done
many different asynchronous operations on the session and possibly
involving other network connections, and the data source doesn't care, it's
just listening for the onComplete signal to know when it can close the
session.




>
> Douglas



-- 
*James Roper*
*Senior Developer, Office of the CTO*

Lightbend <https://www.lightbend.com/> – Build reactive apps!
Twitter: @jroper <https://twitter.com/jroper>


More information about the jdbc-spec-discuss mailing list