JDBC Next questions regarding j.u.c.Flow integration

Douglas Surber douglas.surber at oracle.com
Fri Oct 6 21:42:58 UTC 2017


> On Oct 6, 2017, at 1:17 PM, Dávid Karnok <akarnokd at gmail.com> wrote:
> 
> Interesting. It helps quite a lot if there is a rich fluent library backing the Flow API, which unlike CompletableFuture or Stream as of matter of fact, is not directly available inside the JDK (btw, all sorts of flow manipulation is possible including getting one item from a flow).

It is quite likely the lack of “a rich fluent library backing the Flow API” which has so far doomed our attempts at producing reactive stream based async db access. It also may partly explain why the Class Library team has encouraged us to use CompletableFuture.

Please look at Connection.onSubscription and Row.cancel and Row.request.

Rows are stateful but their state is not required to exist after the termination of a call to handle them.

> 
> I haven't looked through all the proposed API but we have a couple of declarative patterns in the reactive world that could be applied, although they may look odd from an imperative perspective and one needs those fluent libraries too.
> 
> 1) Backpressuring operations
> 
> You can define a method on Connection/OperationGroup that takes a Function. This function then takes a class that can hand out Operation builders and should return a Flow.Publisher<Operation>. It could return a Flow.Publisher<ActiveOperation> and depending on the original operation, a Flow.Publisher<Row> can be retrieved from this ActiveOperation. I think the Operation and ActiveOperation may enter and leave the flows in order, but one could add a tag object to both Operation and ActiveOperation so the producers and consumers can match up across the flows.
> 
> Flow.Publisher<ActiveOperation> handle(Function<OperationSupplier, Flow.Publisher<Operation>> operationSupplier);
> 
> What should happen if the user calls handle() multiple times or mixed with any non-Flow method? If the non-Flow methods are implemented on top of handle() as well, one can merge/concatenate all Flow.Publisher<Operation> into one main Flow.Publisher. Or there is always the option to throw an exception...
> 
> 2) Backpressuring rows
> 
> That's straightforward, consume the aforementioned Flow.Publisher<Row> and limit it to at most 1 Subscriber. I guess the trouble is with Row being stateful: either onNext gets a fresh Row object every time; or the same Row object and has to be processed by the end-consumer carefully (i.e., a fanout would have to copy data out before dispatching).
> 
> Flow.Publisher<Row> getSharedRows();
> Flow.Publisher<Row> getRows() = getSharedRows().map(r -> r.copy());
> 
> 
> 
> 2017-10-06 20:36 GMT+02:00 Douglas Surber <douglas.surber at oracle.com <mailto:douglas.surber at oracle.com>>:
> The use of Flow never seemed organic. It always ended up looking like a exercise to use Flow somehow rather than a natural consequence of the problem we are trying to solve. The one place where Flow sort of fits is RowOperation, witness PublisherOperation. The rest of the API revolves around closures and PublisherOperation is very different. Further, Operations have results and there is no clear way to get a single result from a reactive stream. Well there is, with an aggregator. But that’s what RowOperation does so the benefit from PublisherOperation is unclear.
> 
> Using Flow to handle Operation creation/submission just doesn’t work. It would be a mistake to push or pull Operations through a reactive stream. Operations are created by the Connection/OperationGroup. Once created there is no value to passing them back; in fact, doing so would be a source of errors. So the user code publishing a stream of Operations that the Connection subscribes to just doesn’t work. All that is needed is cancel and request hence Connection.onSubscribe. This feels clumsy as it inverts the flow of control that Operation creation otherwise uses without fully committing to the reactive stream model.
> 
> There may be a completely different API that is built around reactive streams from the ground up. That’s not this API. When we started work on this API, Flow did not exist so there was no support for reactive streams in Java SE. In consultation with the Class Library team we ended up using computational monads, i.e. CompletableFutures. This was not my first choice but it’s where we ended up.
> 
> Douglas
> 
>> On Oct 6, 2017, at 10:49 AM, Dávid Karnok <akarnokd at gmail.com <mailto:akarnokd at gmail.com>> wrote:
>> 
>> Thanks for the answers.
>> 
>> > 4) We have made several attempts to use Flow more extensively but none have worked out. While it is easy enough to give a hand waving description of a way to use Flow, the devil is in the details. We would be happy to better integrate with Flow but at this point we are out of ideas. 
>> 
>> I happen to know a lot about programming with Reactive-Streams/Flow so I'm interested what those problems were.
>> 
>> 
>> 2017-10-06 17:55 GMT+02:00 Douglas Surber <douglas.surber at oracle.com <mailto:douglas.surber at oracle.com>>:
>> There have been a lot of comments wrt reactive streams and Flow. I have a few general comments.
>> 
>> 1) the PublisherOperation in the public version of the API should not have been included. It was created as part of an ultimately failing approach to using Flow. I thought I had created it in a separate branch but apparently not. It has been removed and will not be present in the next upload.
>> 
>> 2) The goal of the API is to be asynchronous. It is not a goal to use reactive streams. That is not to say it won’t use them where appropriate but it is not a goal.
>> 
>> 3) The Java Class Library Team strongly encouraged us to use CompletableFuture. While they did not discourage use of Flow they did not emphasize it. One of the design goals of the API is to integrate well with Java SE and CompletableFuture seemed to do that better than Flow.
>> 
>> 4) We have made several attempts to use Flow more extensively but none have worked out. While it is easy enough to give a hand waving description of a way to use Flow, the devil is in the details. We would be happy to better integrate with Flow but at this point we are out of ideas. If the community thinks Flow is important the best way to help that happen is by producing a detailed proposal in the form of a change set to the existing code. The proposal would have to follow the goals and design principles in the 2016 and 2017 JavaOne presentations and integrate well with the existing API and the rest of Java SE and EE.
>> 
>> 5) That said, one of the design principles for the async API is that one way to do something is enough. There are a thousand ways to solve almost any programming problem. An API can always be extended to provide tools to solve a problem in yet another way. For v1 at least the EG intends to keep the async API as small as possible. Adding a second way to do something that is already possible will require a very convincing use case.
>> 
>> 6) The current version of the API needs back pressure in two places, to limit the rate at which Operations are created/submitted and to limit the rate at which rows are processed. It (mis)uses Flow to meet those needs. This certainly could stand improvement.
>> 
>> Douglas
>> 
>> 
>> 
>> > On Oct 6, 2017, at 7:28 AM, Dávid Karnok <akarnokd at gmail.com <mailto:akarnokd at gmail.com>> wrote:
>> >
>> > Hi. I have a couple of questions/suggestions about how the proposal uses
>> > j.u.c.Flow.
>> >
>> > As I understand, the proposed underlying non-blocking IO uses
>> > CompletableFutures, which are one shot async sources unlike Flow. Is this
>> > due to async NIO being based around CompletableFutures?
>> >
>> > I haven't delved too deeply into NIO but I'd think a Flow-based NIO could
>> > be possible (and perhaps lower overhead) - I understand such thing is out
>> > of scope here but from an API perspective, would it be possible to have as
>> > less ties to CompletableFutures as possible? For example, have most API
>> > Flow-based and the underlying implementation can use CompletableFutures as
>> > sources of notifications?
>> >
>> > The OperationGroup.operationPublisher(Flow.Publisher<Operation>
>> > publisher) Javadoc has this statement:
>> >
>> > * Any {@link Operation} passed to {@link Flow.Subscriber#onNext} must
>> > be created by
>> >
>> > * this {@link OperationGroup}. If it is not {@link
>> > Flow.Subscriber#onNext} throws
>> > * {@link IllegalArgumentException}
>> >
>> >
>> > The Reactive-Streams spec forbids throwing from onNext. The only option, in
>> > general, for "invalid" onNext is to cancel the Subscription. I don't know
>> > how such error could be propagated back to the user other than adding
>> > failure state/signal to the returned OperationGroup.
>> >
>> > I think Row.isLast shouldn't return Future because one can only block or
>> > spin on an non-completed future. On the conceptional level, why would the
>> > row's consumer want to know this information asynchronously? In the Flow
>> > API, onComplete will be eventually called which indicates there are no more
>> > data coming. On the practical level, isLast poses a concurrency problem
>> > because if the consumer may call isLast for multiple items, it will have
>> > multiple outstanding CompletableFutures one of which may signal true while
>> > the processing of the last row is in progress. Now the consumer's
>> > implementor has to work out the concurrency implications of that.
>> >
>> > --
>> > Best regards,
>> > David Karnok
>> 
>> 
>> 
>> 
>> -- 
>> Best regards,
>> David Karnok
> 
> 
> 
> 
> -- 
> Best regards,
> David Karnok



More information about the jdbc-spec-discuss mailing list