Reactive Stream Questions

Stephane Maldini stephane.maldini at gmail.com
Wed Aug 1 02:14:16 UTC 2018


>
> Q1: Is Flow.Publisher<Void> ok? I assume only onComplete or onError would
> be called.


It's totally fine and testable with the RS TCK. OnSubscribe followed by
OnComplete or OnError.
Note that if there is a delay between onSubscribe and OnComplete|OnError,
the subscriber could cancel its subscription before the terminal signal is
received.
Note 2: there is no need for any request(n) by the Subscriber<Void> in this
scenario (no demand is necessary for a terminal signal).
Note 3: Calling subscription methods after observing a terminal signal is
forbidden. Since emitting such signal implies the publisher has cleaned the
subscription state already its unlikely to be flagged as an issue but
compliance is necessary. This is probably is related to rxjava1 history
where you would dispose() even when a terminal signal was observed.

Q2: Which is better, subscribe(Flow.Processor) or
> subscribe(Flow.Subscriber, Flow.Publisher)?

    This is to replace RowPublisherOperation.subscribe(Flow.Subscriber,
CompletionStage).

I'm not enough familiar with the necessity to have such subscribe, please
excuse my ignorant comment :) Why do we need a separate result callback
from the signal emitted in the subscriber, simply making
RowPublisherOperation an effective Publisher<RowColumn> or via a underlying
method RowPublisherOperation.columns() : Publisher<RowColumn> (which can be
overridden by an implementor using a specific reactive api.


Q3: Assume DataSource extends Flow.Publisher<Session>.
>     ds.flatMap( s -> {
>       try(Session session = s.attach()) {
>         session.<Employee>rowPublisherOperation(sql) . . .
>       }
>     });
>     or
>     ds.flatMap( session -> {
>       session.<Employee>rowPublisherOperation(sql) . . .
>     }


This is an interesting question and probably something I wouldn't use
Publisher directly for.You need control over the callback and delimited
callback boundary that RS won't give you just because you react to onNext.
You might want to consider something such as :
 withSession(Function<Session, Publisher<Void>>) : Publisher<Void> -
terminate the session when the returned publisher<void> emits its a
terminal signal. It will make it harder to extract any data outside of the
callback without coordinating with a side effect queue/message passing
structure.
Or
 connect(Function<Session, Publisher<V>>) : Publisher<V> - extract data
from the session and return it via the returned publisher<v>,  terminate
the session when it observes a terminal signal.
Note that this last suggestion could be derived in multiple steps if you
want to define  a mapper/row extractor  and then a closing selector for the
session :

ds.connect(s -> s.rowPublisherOperation(sql).rows())
    .closeOn(row -> !row.hasNext())
    // or .closeOn(Publisher<Void>)

But it might be premature to find better that kind of API guidance, at
least connect(Function<Session,
Publisher<V>>) : Publisher<V> keeps all options open and doesn't leak the
session closing.



On Tue, Jul 31, 2018 at 6:44 AM, Douglas Surber <douglas.surber at oracle.com>
wrote:

> Please do not use this thread to discuss the reactive stream proposal.
> Feel free to discuss the answers to the questions, but not the overall
> proposal.
>
> Q1: Is Flow.Publisher<Void> ok? I assume only onComplete or onError would
> be called.
>
> Q2: Which is better, subscribe(Flow.Processor) or
> subscribe(Flow.Subscriber, Flow.Publisher)?
>     This is to replace RowPublisherOperation.subscribe(Flow.Subscriber,
> CompletionStage).
>
> Q3: Assume DataSource extends Flow.Publisher<Session>.
>
>     ds.flatMap( s -> {
>       try(Session session = s.attach()) {
>         session.<Employee>rowPublisherOperation(sql) . . .
>       }
>     });
>
>     or
>
>     ds.flatMap( session -> {
>       session.<Employee>rowPublisherOperation(sql) . . .
>     }
>
>     The second requires that session only be valid for the duration of the
> call to onNext.
>     The point is that a Session still must be attached and closed. Other
> alternatives?
>
> Douglas




-- 
*Stéphane*
--


More information about the jdbc-spec-discuss mailing list