Completing the result of a RowPublisherOperation

James Roper james at
Wed Jul 25 04:52:03 UTC 2018

Hi all,

Apologies if this reply creates a new thread, I wasn't subscribed to the
list at the time the message was sent and so I'm replying to a forward of
the email.

Douglas wrote:

> I presented ADBA at the Silicon Valley Java Users Group last night. A
> person there asked an excellent question.
> A RowPublisherOperation has a Subscriber and a CompletionStage that
> provides the result of the Operation. What happens if the CompletionStage
> is completed before onError or onComplete is called? That is the Subscriber
> is busily processing RowColumns and for whatever reason calls
> CompletableFuture.complete(foo) on the result while still continuing to
> process RowColumns.

It's a good question. I think a key thing to discuss is what happens if the
subscriber cancels? Because the most common case where this will happen in
practice is in situations where the subscriber decides it's not interested
in any more elements, in which case, it will cancel, and complete the
CompletionStage (note that there's no way to make any assumptions about the
ordering of those two operations). To give a concrete example, using the
MicroProfile Reactive Streams Operators spec (this will hopefully make it
to the JDK one day), let's say I do a query, but then I also want to do
some filtering on the results that I couldn't do via SQL in the database,
for example, I'm querying the items from an invoice, but I only want to
take the first n items such that the sum of the prices of those items is
less than $100. So I can create a Subscriber/CompletionStage pair like this:

AtomicInteger totalPrice = new AtomicInteger();
CompletionSubscriber<Row, List<Row>> result =
    .takeWhile(row -> {
      int price = row.getInt("price");
      int total = totalPrice.addAndGet(price);
      return total < 100;
// And here's our Subscriber/CompletionStage pair:
Subscriber<Row> subscriber = result;
CompletionStage<List<Row>> = result.getCompletionStage();

So, in the above scenario, passing the above subscriber/completionstage
pair, if the total exceeds $100, the completion stage will complete before
the stream is finished, and the subscriber will cancel its subscription. So
what will happen in that scenario? I'm not 100% sure on the underlying
implementation of ADBA as to what makes sense, but here are some
possibilities as I see it:

* Subscribers are not allowed to cancel as a part of normal operation, if
they do, the entire sequence of operations is cancelled.
* Subscribers are allowed to cancel. If they cancel, the rest of the
results for that particular operation are ignored, and once the completion
stage is redeemed, the driver proceeeds to the next operation. As an
optimisation, if the underlying driver supports it, it could send a signal
to the database to stop it from pushing the rows that it's going to ignore
anyway (or if the database protocol is pull based, it can stop pulling and
cancel the cursor). This would be conceptually equivalent to calling
ResultSet.close() in the current jdbc api before you've iterated through
all the rows of it, but then continuing on with the transaction like normal.

So, what happens if it completes without cancelling the subscription? I'd
say ADBA should proceed to the next operation when both the subscriber is
completed, and the completion stage is completed.

The interesting part of this question is that completing the result allows
> the next Operation to execute while at the same time the Subscriber
> continues to process rows.
> Here are some alternatives:
> - Disallow it. Require that onError or onComplete must be called before
> the result is completed.

Even if the subscription doesn't support cancelling as a signal to ignore
the remaining elements, I think it would be good to allow the completion
stage to be allowed to return early. Besides, ensuring the strict
happens-before ordering of these events could possibly be difficult in
certain situations, especially when you're using abstractions to implement
the reactive streams interfaces (as people should be) that may
asynchronously dispatch a signal to complete the CompletionStage which
could be executed before onError or onComplete return.

- Do What I Mean. Completing the result before onError or onComplete is the
> same as calling cancel.

I think you're right that this is "do what I mean", but the behaviour of
what it means to cancel may be up for debate here.

> - Ignore it. The Operation completes only when the result is completed and
> onError or onComplete return.

Not sure that it should be ignored, just combined with onError/onComplete

> - Run with it. The Operation completes when the result is completed and
> the implementation continues to call onNext.
> - Vendor specific. The implementation continues to call onNext. The
> Operation completes when the implementation decides to complete it. Maybe
> immediately. Maybe after all the rows are processed. Maybe somewhere in
> between. Somewhere between Ignore it and Run with it inclusive.
> Thoughts?
> Douglas
> --
> Cheers,
> ——————
> *Viktor Klang*
> Deputy CTO
> Lightbend, Inc.

*James Roper*
*Senior Developer, Office of the CTO*

Lightbend <> – Build reactive apps!
Twitter: @jroper <>

More information about the jdbc-spec-discuss mailing list