RowProcessorOperation
Douglas Surber
douglas.surber at oracle.com
Fri Dec 15 22:21:22 UTC 2017
Note: This discussion started as a private exchange but Lance noted, correctly, that it should be on this list. With everyone’s permission I’m continuing it here. This is the first post on this list so you’ll have to read the quoted messages to get the context.
Mark,
Every Operation has a result value. That is the value that completes the CompletionStage that is returned by Submission.getCompletionStage. This value is the result of the database action. The result of a RowOperation is the value returned by Collector.finisher. RowProcessorOperation also has a result value which is the last value published by the Processor.
The ADBA driver publishes rows to the Processor and subscribes to the Processor for results. I know the JavaDoc that describes this is inadequate. I’m having a hard time finding the right language. Perhaps an example will help. This method takes a Connection and an invoice id and returns a CompletionStage producing the average price of the line items in the invoice.
public CompletionStage<Long> flow(Connection conn, int invoiceId) {
String sql = "select cost from line_item where invoice = :invoiceId";
Flow.Processor<Result.Row, Long> p = new Flow.Processor<>() {
protected long sum = 0;
protected long count = 0;
protected long resultDemand = 0;
protected Flow.Subscription rowSubscription;
protected Flow.Subscription resultSubscription = new Flow.Subscription() {
@Override
public void request(long n) {
// the driver will call this method to request a result value
resultDemand += n;
rowSubscription.request(n);
}
@Override
public void cancel() {
// Submission.cancel will call this method
rowSubscription.cancel();
}
};
protected Flow.Subscriber resultSubscriber;
@Override
public void onSubscribe(Flow.Subscription subscription) {
rowSubscription = subscription;
}
@Override
public void onNext(Result.Row item) {
// the driver will call this repeatedly, once for each row
sum += item.get("cost", Long.class);
count++;
rowSubscription.request(1);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
// the driver will call this when there are no more rows
resultSubscriber.onNext(sum / count); // publish the result
resultSubscriber.onComplete(); // no more results
}
@Override
public void subscribe(Flow.Subscriber<? super Long> subscriber) {
resultSubscriber = subscriber;
resultSubscriber.onSubscribe(resultSubscription);
}
};
return conn.<Long>rowProcessorOperation(sql)
.set("invoiceId", invoiceId)
.rowProcessor(p)
.submit()
.getCompletionStage();
}
My objection to PublisherOperation is that there is no result value. The driver publishes the rows to the Subscriber but never gets anything back. RowProcessorOperation does exactly the same thing but the Processor not only subscribes to the Rows but it also publishes results the last of which is the result value of the Operation.
Douglas
> On Dec 15, 2017, at 12:13 PM, Mark Paluch <mpaluch at pivotal.io> wrote:
>
> The result of the PublisherOperation is that the actual publisher receives a subscribe signal (Publisher#subscribe), the publisher notifies the subscriber with a subscription handle (Subscriber#onSubscribe) and starts emitting rows (Subscriber#onNext) as soon as the subscriber registers demand (Subscription#request). From a conceptional perspective, subscribing to the publisher is the last action that happens with the publisher, similar to what you’d achieve with a submission. PublisherOperation (in implementations most likely a delegate or communication component) is the one which should write requests to I/O and emit rows once received and decoded from I/O.
>
> A Processor is a Subscriber and a Publisher at the same time. It exposes onNext, onError and onComplete methods as well as a subscribe method. It requires to receive signals and pipes/broadcasts these to the subscribers that subscribe to the publisher. Typical use for a processor is to have a single upstream with multiple subscribers. Or what Lightbend did with their HTTP client as sort of a bridge: The processor receives HttpRequests and emits HttpResponses. For a database scenario that could be something like Processor<String, Row> where String represents a SQL string. I attached a few links [0] to [6] that point to implementations of processors in Project Reactor and RxJava 2. MongoDB follows a similar notion with their Reactive Streams driver to expose a fluent API (although their intermediate objects aren’t immutable) in which Publisher#subscribe is the action to initiate execution.
>
> Another aspect is that anyone passing in a processor can call methods on the processor. One could call onError/onComplete, change the state of the processor which makes it unusable for RowProcessorOperation as the processor is in terminated state and subsequent calls by RowProcessorOperation would fail as per ReactiveStreams (Flow) spec.
>
> [0] https://github.com/reactor/reactor-core/blob/master/reactor-core/src/main/java/reactor/core/publisher/DirectProcessor.java
> [1] https://github.com/reactor/reactor-core/blob/master/reactor-core/src/main/java/reactor/core/publisher/EmitterProcessor.java
> [2] https://github.com/reactor/reactor-core/blob/master/reactor-core/src/main/java/reactor/core/publisher/EventLoopProcessor.java
> [3] https://github.com/reactor/reactor-core/blob/master/reactor-core/src/main/java/reactor/core/publisher/TopicProcessor.java
> [4] https://github.com/ReactiveX/RxJava/blob/2.x/src/main/java/io/reactivex/processors/BehaviorProcessor.java
> [5] https://github.com/ReactiveX/RxJava/blob/2.x/src/main/java/io/reactivex/processors/FlowableProcessor.java
> [6] https://github.com/ReactiveX/RxJava/blob/2.x/src/main/java/io/reactivex/processors/ReplayProcessor.java
> [7] https://github.com/mongodb/mongo-java-driver-reactivestreams/blob/master/driver/src/main/com/mongodb/reactivestreams/client/FindPublisher.java <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_mongodb_mongo-2Djava-2Ddriver-2Dreactivestreams_blob_master_driver_src_main_com_mongodb_reactivestreams_client_FindPublisher.java&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PZh8Bv7qIrMUB65eapI_JnE&r=DFcFtomvCycx6XeC1vpTp5TdBqlgkiljGpb5LljiXB4&m=rEyeBsAzN8m-M_934mpoZad5GZCpMpkhwpqB7Cz1CqE&s=7zSX8579HKwxfg2w0TNRkhxZei7c9xKjI7eEigrrdeo&e=>
>
> On 15. Dez. 2017, 18:23 +0100, Douglas Surber <douglas.surber at oracle.com>, wrote:
>> But what is the result of a PublisherOperation? How does this tie into the rest of the API? That’s why I replaced PublisherOperation with ProcessorOperation. An Operation must have a result that is a function of the database action. That’s how Operations interact with the rest of the app and with each other. RowProcessorOperation is nearly identical to PublisherOperation except that it defines the result of the Operation in a natural way.
>>
>> I don’t understand your comment that way Processor is used in RowProcessorOperation is not typical. The Processor reads an input stream, the rows and transforms it into an output stream, the results. That’s exactly what a Processor does. I can’t see any way this is atypical.
>>
>> Douglas
>>
>>> On Dec 15, 2017, at 8:37 AM, Mark Paluch <mpaluch at pivotal.io <mailto:mpaluch at pivotal.io>> wrote:
>>>
>>> Hi Douglas,
>>>
>>> Sorry it took me a while to get back to this topic.
>>>
>>> I think your (Lances) approach with PublisherOperation was quite promising. I created a proposal based on that interface. It does not reflect Operation<T> and the relationship to collect(Collector) yet, let’s keep that for a later step.
>>>
>>> /**
>>> * A publisher operation exposes a reactive execution model using
>>> * Flow API. It provides an alternative model to {@link Operation#submit}
>>> * allowing a streaming consumption of results.
>>> *
>>> * This {@link Operation} starts materializing this {@link Flow.Publisher}
>>> * on {@link #subscribe subscription}. The {@link Operation}
>>> * starts emitting {@link Result.Row rows} by calling {@link Flow.Subscriber#onNext}
>>> * as soon as the {@link Flow.Subscription} registers demand.
>>> *
>>> * If the {@link Operation} causes an error it will call
>>> * {@link Flow.Subscriber#onError} with the {@code Exception} then complete
>>> * exceptionally with that {@code Exception}. If the {@link Operation}
>>> * yields no result, it will call {@link Flow.Subscriber#onComplete}
>>> * without emitting {@link Result#Row}.
>>> * Calling {@link Flow.Subscription#cancel} requests cancellation similar to
>>> * {@link Submission#cancel}.
>>> * Each time a {@link Flow.Subscriber} subscribes to {@link PublisherOperation}
>>> * it initiates a new execution of the {@link Operation}.
>>> *
>>> * Implementations are free to decide whether the actual
>>> * execution should be initiated upon subscription or as soon as a
>>> * {@link Flow.Subscription} registers demand through {@link Flow.Subscription#request}.
>>> *
>>> * @param <T> the type of the result of the {@link Operation}
>>> */
>>> public interface PublisherOperation<T> extends Operation<T>, Flow.Publisher<Result.Row> {
>>>
>>> @Override
>>> public void subscribe(Flow.Subscriber<? super Result.Row> subscriber);
>>> }
>>>
>>> Let me know what you think about it.
>>>
>>> Cheers,
>>> Mark
>>>
>>> On 12. Dez. 2017, 17:55 +0100, Douglas Surber <douglas.surber at oracle.com <mailto:douglas.surber at oracle.com>>, wrote:
>>>> RowProcessorOperation does exactly what you asked for in the previous email. So you want something that does what RowProcessorOperation does but you want to get rid of RowProcessorOperation because the use of Processor isn’t typical. That’s fine but RowProcessorOperation is the best solution I have found to the problem. If you think it is not a good solution then by all means suggest an alternative.
>>>>
>>>> Douglas
>>>>
>>>>> On Dec 12, 2017, at 8:27 AM, Mark Paluch <mpaluch at pivotal.io <mailto:mpaluch at pivotal.io>> wrote:
>>>>>
>>>>> Hi Douglas,
>>>>>
>>>>> I’ve seen RowProcessorOperation and wasn’t entirely happy with methods accepting a Flow.Processor. Typical use for processors is broadcasting/multiplexing of events. IMHO RowProcessorOperation is a good candidate to get rid of.
>>>>>
>>>>> Cheers,
>>>>> Mark
>>>>>
>>>>> On 12. Dez. 2017, 17:15 +0100, Douglas Surber <douglas.surber at oracle.com <mailto:douglas.surber at oracle.com>>, wrote:
>>>>>> Have you looked at RowProcessorOperation?
>>>>>>
>>>>>> Douglas
>>>>>>
>>>>>>> On Dec 11, 2017, at 11:25 PM, Mark Paluch <mpaluch at pivotal.io <mailto:mpaluch at pivotal.io>> wrote:
>>>>>>>
>>>>>>> Hi Douglas –
>>>>>>>
>>>>>>> Thanks a lot for your answer. I didn’t want to stress Reactive Streams in the first place – There are a couple of perspectives on this topic and I see your point in not requiring additional composition libraries to consume ADBA.
>>>>>>>
>>>>>>> I see it right now more from the perspective of enabling choice in places where it makes sense, similar to convenience methods you provide already in, e.g. Connection with the difference that Connection.connect() suggests synchronous execution on top of a non-blocking API.
>>>>>>>
>>>>>>> ADBA is limiting in the way of consuming results as it requires a collector and collects results until an operation is complete. Comparing to JDBC, a ResultSet is a pull-style iterator allowing a streaming consumption of results. That said, large results in ADBA would require paging and multiple queries to consume the data set.
>>>>>>>
>>>>>>> Reactive Streams comes with a dozen of opportunities that allow rethinking today’s data access of which back pressure is just one of these. I wouldn’t reduce back pressure/demand-awareness to only a measure to not overwhelm the consumer.
>>>>>>>
>>>>>>> Communicating demand to the data source (driver or in the future even to the database) is a sophisticated mechanism that allows drivers to react: Drivers can control fetch size (if not already set by the consumer), decide the amount of data to read and when to do the actual read. Demand can also suspend reading from the database if the client has no demand leading to a more efficient, demand-driven resource usage.
>>>>>>>
>>>>>>> Allowing e.g. RowOperation to be a Flow.Publisher (with keeping submissions) removes the collector limitation for Reactive Streams consumers and thus the limitation of issuing multiple queries to a database. Having callbacks (e.g. onNext(Consumer<Row>), similar to onError(…)) would as well help to remove these limitations. Without these limitations, consumers are able to reduce latency to first result and overall improve GC pressure.
>>>>>>>
>>>>>>> I’m happy to continue discussion on that topic on the mailing list. We integrated recently various drivers for Spring’s reactive data access and are happy to share more about effects and implications that come when used in applications. That’s primarily why I asked about EG and the way of collaboration.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Mark
>>>>>>>
>>>>>>>
>>>>>>> On 11. Dez. 2017, 19:33 +0100, Douglas Surber <douglas.surber at oracle.com <mailto:douglas.surber at oracle.com>>, wrote:
>>>>>>>>>>> which seem limiting to us in terms of a streaming consumption of results (Flow.Publisher).
>>>>>>>>
>>>>>>>> ADBA is not intended to be a pure reactive stream API. That is not and never has been a goal. This is not to say such an API would be a bad thing, but that is not what ADBA set out to be. What is a reasonable requirement however is that it be possible to write a real pure reactive stream API on top of ADBA. We believe that the current version of ADBA supports this. If not, please let us know what is missing.
>>>>>>>>
>>>>>>>> Keep in mind that if back pressure is of no use to the implementation/database then there is no reason for ADBA to support back pressure. The goal of back pressure is to eliminate the need for arbitrarily large queues. If the queue size is intrinsically limited (to something reasonable) then there is no need for back pressure. In such a case ADBA does not support reactive streams (or any other mechanism to provide back pressure) as it would be superfluous. A pure reactive stream API might have to expose hooks that would in theory provide back pressure in such a case. There would be no loss if those hooks did little or nothing as the goal of the back pressure, limited queue size, is already met.
>>>>>>>>
>>>>>>>> Douglas
More information about the jdbc-spec-discuss
mailing list