Flow.Processor for Operation submission

Dávid Karnok akarnokd at gmail.com
Thu Oct 12 20:07:45 UTC 2017


I haven't looked at the full API, but would it be possible to invert the
connection - operationgroup relation? One could create operation groups and
operations in them freely and once submitted to a connection, they become
bound to that connection. Is there something that has to be associated with
groups way before they are submitted to a connection?

2017-10-12 21:53 GMT+02:00 Douglas Surber <douglas.surber at oracle.com>:

> There are two cases for values that would be improper to pass to onNext.
> In both cases the values are themselves proper. The error is passing them
> to onNext.
>
>   1) Passing an Operation that is a member of OperationGroup A to an
> operationProcessor for OperationGroup B.
>
>   2) Passing an Operation that has already been submitted.
>
> Code example:
>
>   void foo(Connection conn1, Connection conn2) {
>     String updateSql = null;
>     Flow.Subscription aSubscription = null;
>     conn1.countOperation(updateSql).submit(); // this is ok
>     Flow.Processor<Operation<Object>, Submission<Object>> p =
> conn1.operationProcessor();
>     p.onSubscribe(aSubscription);
>     p.onNext(conn1.countOperation(updateSql)); // this is ok
>     conn2.countOperation(updateSql).submit(); // this is ok
>     p.onNext(conn2.countOperation(updateSql)); // error case 1. Op
> created by conn2 passed to Processor for conn1.
>     Operation x = conn1.countOperation(updateSql);
>     x.submit();
>     p.onNext(x);  // error case 2. Op created by conn1 but already
> submitted
>   }
>
> Note: Connection inherits from OperationGroup.
>
> Note 2: OperationGroup is a tool for managing errors. By default if an
> Operation fails all subsequent Operations in the same OperationGroup are
> skipped, are completed exceptionally with SqlSkippedException. So you can
> use OperationGroups to control which Operations are skipped and hence when
> execution resumes. OperationGroups have other capabilities. OperationGroups
> are Operation factories. Since Connections need to be Operation factories
> Connections are OperationGroups. There are more or less reasonable use
> cases for using all of the capabilities of OperationGroup on Connection.
>
> Note 3: A Connection close Operation is never skipped.
>
> Douglas
>
> On Oct 12, 2017, at 11:20 AM, Viktor Klang <v at lightbend.com> wrote:
>
> Hi Douglas,
>
> Would you mind elaborating a bit about these improper values? I.e. how do
> they manifest, and what would a Publisher do about it?
>
> (I'm super excited about the prospect of having an async JDBC!)
>
> Best regards,
>>
> On Oct 12, 2017 17:56, "Douglas Surber" <douglas.surber at oracle.com> wrote:
>
>> There are two error cases I need to specify. Each error case occurs when
>> the Publisher calls Subscriber.onNext with an improper value. So far as I
>> can comprehend in both cases the Subscriber calls Subscription.cancel.
>> That’s all well and good but I do not see any way for the Subscriber to
>> report which of the two cases provoked the call to cancel. What am I
>> missing?
>>
>> Further it is at least locally reasonable that the Subscriber tolerate
>> these improper values, reporting them as errors but allowing the Publisher
>> to send more values anyway. If we did choose to do this I don’t see any way
>> for the Subscriber to inform the Publisher that anything untoward happened
>> at all.
>>
>> Douglas
>>
>>
>> On Oct 12, 2017, at 8:20 AM, Konrad “ktoso” Malawski <ktosopl at gmail.com>
>> wrote:
>>
>> Hi Douglas, David,
>>
>> After reading the JavaDoc it’s not clear to me that onNext cannot throw.
>> The JavaDoc says “the resulting behavior is not guaranteed” but it doesn’t
>> say that onNext cannot throw. Can you point to something that has more
>> detail?
>>
>>
>> The full set of rules of Reactive Streams, and thus also, Flow.*
>> semantics is
>> available on the reactive streams site here: https://github.com/react
>> ive-streams/reactive-streams-jvm#specification
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_reactive-2Dstreams_reactive-2Dstreams-2Djvm-23specification&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=8PKpeAVkkcN6hA6GJ2zcEVqI3ohcI5138zxnEdjgetA&e=>
>>
>>
>> We (specifically; Viktor, initiator of RS, CCed here) have been in talks
>> with Doug Lea to include the actual specification rules
>> in JavaDoc of Flow as we felt this would happen (that people would not
>> realize there’s a full well-defined spec behind it).
>> I hope we’ll be able to get that done, as otherwise many people will be
>> left unaware of the rules that govern Flow implementations.
>> There also is a TCK available
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_reactive-2Dstreams_reactive-2Dstreams-2Djvm_tree_master_tck&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=mGgS1KAtK30Xw6rPq4K6cOFAL4UN3rz6mI8wLqQiUUA&e=> as
>> testng tests. I’ve been slow to update it
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_reactive-2Dstreams_reactive-2Dstreams-2Djvm_pull_398&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=xC07YG9iqW5h0mXN1exu72kW4TUTNr_YmhxG94BMKqw&e=> to
>> directly work on Flow.* types – very sorry about that, and will try to get
>> it completed very soon so Flow impls can use it directly.
>>
>> Glad to see many RS contributors joining the discussion here, and
>> hopefully we’ll be able to help out making efficient use of Flow in at
>> least some parts of the API.
>>
>>
>> Similar APIs, that might be useful as examples in the discussion (I hope
>> that’s helpful):
>>
>> I have yet to read up on details about what Operation/OperationGroup/Submission
>> etc really mean semantically,
>> but if you want to model a back-pressured stream that back-pressures the
>> incoming operations and emits Submissions (are they like “results”? I’ll
>> read up more),
>> then modeling it as a Processor is indeed a good idea. This is how we
>> have modeled the back-pressured HTTP-client in Akka HTTP, the type is
>> (translated):
>> Processor<HttpRequest, HttpResponse>, which may be backed by multiple
>> connections or by a single one internally, so the pulling in requests is
>> handled appropriately.
>> Looking at the API proposed here it seems that’s what you’re after; so I
>> can confirm we have such API in the wild since some years and it functions
>> well.
>> The API I talk about is here https://github.com/akka/a
>> kka-http/blob/master/akka-http-core/src/main/scala/akka/http
>> /javadsl/Http.scala#L380
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_akka_akka-2Dhttp_blob_master_akka-2Dhttp-2Dcore_src_main_scala_akka_http_javadsl_Http.scala-23L380&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=Ji0jhIlx0uZcHaIXJmiv3P03oR_O4bq2HzBL1FprLoQ&e=>
>>  (assume Flow is exactly like a Processor)
>>
>> Another example of reactive-streams with JDBC in action is Slick, and
>> it’s wrappers in Alpakka, here’s examples: https://developer.li
>> ghtbend.com/docs/alpakka/current/slick.html
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__developer.lightbend.com_docs_alpakka_current_slick.html&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=QowbfwYtawd90nnK4Ubzra0xVXUigKlX4jX5YCgSEds&e=>
>>
>> It’s pretty simple, you can click on the Java tab for inspiration (Sink
>> == Subscriber, Source == Publisher, Flow == Processor), and basically the
>> simplest version of an RS integration.
>>
>> Hope this helps,
>> happy to help with further design or general reactive-streams questions
>> (we’re trying to figure out how to help here the most efficiently
>> currently).
>>
>>
>> --
>> Cheers,
>> Konrad 'ktoso
>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__kto.so&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=x4S1kDPFg6JwQ9mRlkvN22YRVOpNwiB-5BgAITRIsv8&e=>'
>> Malawski
>> Akka
>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__akka.io_&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=5b5-GC2k85BKW51yEAecgwQYujsGU-PGmIBOts8xY6w&e=>
>>  @ Lightbend
>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__lightbend.com_&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=CBQKBaKMiqEV5R_eUsO3lULxZXmBerKDlw0Gsyw2pG0&e=>
>>
>> On October 12, 2017 at 23:54:21, Douglas Surber (
>> douglas.surber at oracle.com) wrote:
>>
>> After reading the JavaDoc it’s not clear to me that onNext cannot throw.
>> The JavaDoc says “the resulting behavior is not guaranteed” but it doesn’t
>> say that onNext cannot throw. Can you point to something that has more
>> detail?
>>
>> Douglas
>>
>>
>> > On Oct 12, 2017, at 7:33 AM, Douglas Surber <douglas.surber at oracle.com>
>> wrote:
>> >
>> > Thanks for the correction with regard to onNext.
>> >
>> >> On Oct 11, 2017, at 3:54 PM, Dávid Karnok <akarnokd at gmail.com <mailto:
>> akarnokd at gmail.com>> wrote:
>> >>
>> >>> Calling Subscription.onNext with any
>> >>> * Operation that is not a member of this OperationGroup, that is
>> >>> * was not created by calling one of the Operation factory methods on
>> this
>> >>> * OperationGroup, will throw IllegalArgumentException.
>> >>
>> >> Processor.onNext must not throw, therefore an invalid onNext item
>> should cancel the Subscription and signal onError to the Publisher side of
>> the Processor.
>> >>
>> >>> Still thinking about how to publish rows.
>> >>
>> >> In rxjava-jdbc, ResultSet is used as a lambda parameter and the
>> instance shown to the lambda should not escape it, i.e.,
>> >>
>> >> <T> Flow.Publisher<T> convert(Function<Row, T> mapper);
>> >>
>> >> 2017-10-11 22:05 GMT+02:00 Douglas Surber <douglas.surber at oracle.com
>> <mailto:douglas.surber at oracle.com> <mailto:douglas.surber at oracle.com
>> <mailto:douglas.surber at oracle.com>>>:
>> >> I have found what at first glance appears to be a non-hacky way to use
>> Flow to channel Operations to an OperationGroup. It’s not perfect. The
>> limitation that published Operations must be member Operations of the
>> Operation group is unfortunate but absolutely necessary. Otherwise I think
>> it is ok.
>> >>
>> >> Delete Connection.onSubscribe.
>> >>
>> >> Add the following to OperationGroup:
>> >>
>> >> /**
>> >> * Returns a Flow.Processor that subscribes to a sequence of Operations
>> and
>> >> * publishes a sequence of corresponding Submissions. The Operations
>> must
>> >> * be members of this OperationGroup. Calling Subscription.onNext with
>> any
>> >> * Operation that is not a member of this OperationGroup, that is
>> >> * was not created by calling one of the Operation factory methods on
>> this
>> >> * OperationGroup, will throw IllegalArgumentException. The method
>> >> * Subscription.onNext will call submit on each Operation it is passed
>> >> * and publish the resulting Submission. Since an Operation can only be
>>
>> >> * submitted once, submitting an Operation and calling onNext with that
>>
>> >> * submitted Operation will throw IllegalStateException.
>> >> *
>> >> * Each call to this method returns a new Flow.processor. The
>> Submissions
>> >> * published to each Processor are exactly those generated by calling
>> >> * submit on the Operations passed as arguments to onNext on the same
>> Processor.
>> >> *
>> >> * Note: If any Operation is submitted directly, that is by calling
>> submit
>> >> * rather than passing it to onNext, the Submission returned by the
>> submit
>> >> * call will not be published.
>> >> *
>> >> * @return a Flow.Processor that accepts Operations and generates
>> Submissions
>> >> */
>> >> public Flow.Processor<Operation<T>, Submission<T>>
>> operationProcessor();
>> >>
>> >> Comments?
>> >>
>> >> Still thinking about how to publish rows.
>> >>
>> >>
>> >> Douglas
>> >>
>> >>
>> >>
>> >> --
>> >> Best regards,
>> >> David Karnok
>>
>>
>>
>


-- 
Best regards,
David Karnok


More information about the jdbc-spec-discuss mailing list