Re[2]: Reactive Stream Questions
Олег Докука
shadowgun at i.ua
Wed Aug 1 07:14:31 UTC 2018
Agree with Stephane and James, it would be better to assume that a Session is given not for the period of onNext invocation, but for the life-cycle of the produced sub-publisher. In the case of Stephane's option, the API would be more explicit which (ds.connect()) is better (from my point of view). However, in the case of DataSource extends Publisher, it would be better to open the Session when the Functions<Session, Publisher> is produced and close it when the inner is completed, so the possible code may look like the following:
Session session = new Session(..);
Publisher publisher = function.apply(session);
session.open();
publisher.subscribe(new Subscriber() {
....
public void onError(Throwable t) {
session.close();
}
public void onComplete() {
session.close();
}
}
So, as it might be noticed from the code, the session will be closed only when the inner stream has been completed which give an asynchronous behaviour for us and allows executions from the, potentially, different Threads, etc
With the best,
Oleh Dokuka
01.08.2018 05:14, Stephane Maldini <stephane.maldini at gmail.com>
>>
> > Q1: Is Flow.Publisher<Void> ok? I assume only onComplete or onError would
> > be called.
>
>
> It's totally fine and testable with the RS TCK. OnSubscribe followed by
> OnComplete or OnError.
> Note that if there is a delay between onSubscribe and OnComplete|OnError,
> the subscriber could cancel its subscription before the terminal signal is
> received.
> Note 2: there is no need for any request(n) by the Subscriber<Void> in this
> scenario (no demand is necessary for a terminal signal).
> Note 3: Calling subscription methods after observing a terminal signal is
> forbidden. Since emitting such signal implies the publisher has cleaned the
> subscription state already its unlikely to be flagged as an issue but
> compliance is necessary. This is probably is related to rxjava1 history
> where you would dispose() even when a terminal signal was observed.
>
> Q2: Which is better, subscribe(Flow.Processor) or
> > subscribe(Flow.Subscriber, Flow.Publisher)?
>
> This is to replace RowPublisherOperation.subscribe(Flow.Subscriber,
> CompletionStage).
>
> I'm not enough familiar with the necessity to have such subscribe, please
> excuse my ignorant comment :) Why do we need a separate result callback
> from the signal emitted in the subscriber, simply making
> RowPublisherOperation an effective Publisher<RowColumn> or via a underlying
> method RowPublisherOperation.columns() : Publisher<RowColumn> (which can be
> overridden by an implementor using a specific reactive api.
>
>
> Q3: Assume DataSource extends Flow.Publisher<Session>.
> > ds.flatMap( s -> {
> > try(Session session = s.attach()) {
> > session.<Employee>rowPublisherOperation(sql) . . .
> > }
> > });
> > or
> > ds.flatMap( session -> {
> > session.<Employee>rowPublisherOperation(sql) . . .
> > }
>
>
> This is an interesting question and probably something I wouldn't use
> Publisher directly for.You need control over the callback and delimited
> callback boundary that RS won't give you just because you react to onNext.
> You might want to consider something such as :
> withSession(Function<Session, Publisher<Void>>) : Publisher<Void> -
> terminate the session when the returned publisher<void> emits its a
> terminal signal. It will make it harder to extract any data outside of the
> callback without coordinating with a side effect queue/message passing
> structure.
> Or
> connect(Function<Session, Publisher<V>>) : Publisher<V> - extract data
> from the session and return it via the returned publisher<v>, terminate
> the session when it observes a terminal signal.
> Note that this last suggestion could be derived in multiple steps if you
> want to define a mapper/row extractor and then a closing selector for the
> session :
>
> ds.connect(s -> s.rowPublisherOperation(sql).rows())
> .closeOn(row -> !row.hasNext())
> // or .closeOn(Publisher<Void>)
>
> But it might be premature to find better that kind of API guidance, at
> least connect(Function<Session,
> Publisher<V>>) : Publisher<V> keeps all options open and doesn't leak the
> session closing.
>
>
>
> On Tue, Jul 31, 2018 at 6:44 AM, Douglas Surber <douglas.surber at oracle.com>
> wrote:
>
> > Please do not use this thread to discuss the reactive stream proposal.
> > Feel free to discuss the answers to the questions, but not the overall
> > proposal.
> >
> > Q1: Is Flow.Publisher<Void> ok? I assume only onComplete or onError would
> > be called.
> >
> > Q2: Which is better, subscribe(Flow.Processor) or
> > subscribe(Flow.Subscriber, Flow.Publisher)?
> > This is to replace RowPublisherOperation.subscribe(Flow.Subscriber,
> > CompletionStage).
> >
> > Q3: Assume DataSource extends Flow.Publisher<Session>.
> >
> > ds.flatMap( s -> {
> > try(Session session = s.attach()) {
> > session.<Employee>rowPublisherOperation(sql) . . .
> > }
> > });
> >
> > or
> >
> > ds.flatMap( session -> {
> > session.<Employee>rowPublisherOperation(sql) . . .
> > }
> >
> > The second requires that session only be valid for the duration of the
> > call to onNext.
> > The point is that a Session still must be attached and closed. Other
> > alternatives?
> >
> > Douglas
>
>
>
>
> --
> *Stéphane*
> --
-- реклама -----------------------------------------------------------
Самый летний отель юга Украины, бронируй отдых со скидкой
20% при бронировании от 3 ночей -> https://is.gd/S2KM7u
More information about the jdbc-spec-discuss
mailing list