Flow.Processor for Operation submission

Viktor Klang v at lightbend.com
Thu Oct 12 18:20:18 UTC 2017


Hi Douglas,

Would you mind elaborating a bit about these improper values? I.e. how do
they manifest, and what would a Publisher do about it?

(I'm super excited about the prospect of having an async JDBC!)

Best regards,
√

On Oct 12, 2017 17:56, "Douglas Surber" <douglas.surber at oracle.com> wrote:

> There are two error cases I need to specify. Each error case occurs when
> the Publisher calls Subscriber.onNext with an improper value. So far as I
> can comprehend in both cases the Subscriber calls Subscription.cancel.
> That’s all well and good but I do not see any way for the Subscriber to
> report which of the two cases provoked the call to cancel. What am I
> missing?
>
> Further it is at least locally reasonable that the Subscriber tolerate
> these improper values, reporting them as errors but allowing the Publisher
> to send more values anyway. If we did choose to do this I don’t see any way
> for the Subscriber to inform the Publisher that anything untoward happened
> at all.
>
> Douglas
>
>
> On Oct 12, 2017, at 8:20 AM, Konrad “ktoso” Malawski <ktosopl at gmail.com>
> wrote:
>
> Hi Douglas, David,
>
> After reading the JavaDoc it’s not clear to me that onNext cannot throw.
> The JavaDoc says “the resulting behavior is not guaranteed” but it doesn’t
> say that onNext cannot throw. Can you point to something that has more
> detail?
>
>
> The full set of rules of Reactive Streams, and thus also, Flow.* semantics
> is
> available on the reactive streams site here: https://github.com/
> reactive-streams/reactive-streams-jvm#specification
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_reactive-2Dstreams_reactive-2Dstreams-2Djvm-23specification&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=8PKpeAVkkcN6hA6GJ2zcEVqI3ohcI5138zxnEdjgetA&e=>
>
>
> We (specifically; Viktor, initiator of RS, CCed here) have been in talks
> with Doug Lea to include the actual specification rules
> in JavaDoc of Flow as we felt this would happen (that people would not
> realize there’s a full well-defined spec behind it).
> I hope we’ll be able to get that done, as otherwise many people will be
> left unaware of the rules that govern Flow implementations.
> There also is a TCK available
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_reactive-2Dstreams_reactive-2Dstreams-2Djvm_tree_master_tck&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=mGgS1KAtK30Xw6rPq4K6cOFAL4UN3rz6mI8wLqQiUUA&e=> as
> testng tests. I’ve been slow to update it
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_reactive-2Dstreams_reactive-2Dstreams-2Djvm_pull_398&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=xC07YG9iqW5h0mXN1exu72kW4TUTNr_YmhxG94BMKqw&e=> to
> directly work on Flow.* types – very sorry about that, and will try to get
> it completed very soon so Flow impls can use it directly.
>
> Glad to see many RS contributors joining the discussion here, and
> hopefully we’ll be able to help out making efficient use of Flow in at
> least some parts of the API.
>
>
> Similar APIs, that might be useful as examples in the discussion (I hope
> that’s helpful):
>
> I have yet to read up on details about what Operation/OperationGroup/Submission
> etc really mean semantically,
> but if you want to model a back-pressured stream that back-pressures the
> incoming operations and emits Submissions (are they like “results”? I’ll
> read up more),
> then modeling it as a Processor is indeed a good idea. This is how we have
> modeled the back-pressured HTTP-client in Akka HTTP, the type is
> (translated):
> Processor<HttpRequest, HttpResponse>, which may be backed by multiple
> connections or by a single one internally, so the pulling in requests is
> handled appropriately.
> Looking at the API proposed here it seems that’s what you’re after; so I
> can confirm we have such API in the wild since some years and it functions
> well.
> The API I talk about is here https://github.com/akka/
> akka-http/blob/master/akka-http-core/src/main/scala/akka/
> http/javadsl/Http.scala#L380
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_akka_akka-2Dhttp_blob_master_akka-2Dhttp-2Dcore_src_main_scala_akka_http_javadsl_Http.scala-23L380&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=Ji0jhIlx0uZcHaIXJmiv3P03oR_O4bq2HzBL1FprLoQ&e=>
>  (assume Flow is exactly like a Processor)
>
> Another example of reactive-streams with JDBC in action is Slick, and it’s
> wrappers in Alpakka, here’s examples: https://developer.
> lightbend.com/docs/alpakka/current/slick.html
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__developer.lightbend.com_docs_alpakka_current_slick.html&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=QowbfwYtawd90nnK4Ubzra0xVXUigKlX4jX5YCgSEds&e=>
>
> It’s pretty simple, you can click on the Java tab for inspiration (Sink ==
> Subscriber, Source == Publisher, Flow == Processor), and basically the
> simplest version of an RS integration.
>
> Hope this helps,
> happy to help with further design or general reactive-streams questions
> (we’re trying to figure out how to help here the most efficiently
> currently).
>
>
> --
> Cheers,
> Konrad 'ktoso
> <https://urldefense.proofpoint.com/v2/url?u=http-3A__kto.so&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=x4S1kDPFg6JwQ9mRlkvN22YRVOpNwiB-5BgAITRIsv8&e=>'
> Malawski
> Akka
> <https://urldefense.proofpoint.com/v2/url?u=http-3A__akka.io_&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=5b5-GC2k85BKW51yEAecgwQYujsGU-PGmIBOts8xY6w&e=>
>  @ Lightbend
> <https://urldefense.proofpoint.com/v2/url?u=http-3A__lightbend.com_&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=CBQKBaKMiqEV5R_eUsO3lULxZXmBerKDlw0Gsyw2pG0&e=>
>
> On October 12, 2017 at 23:54:21, Douglas Surber (douglas.surber at oracle.com)
> wrote:
>
> After reading the JavaDoc it’s not clear to me that onNext cannot throw.
> The JavaDoc says “the resulting behavior is not guaranteed” but it doesn’t
> say that onNext cannot throw. Can you point to something that has more
> detail?
>
> Douglas
>
>
> > On Oct 12, 2017, at 7:33 AM, Douglas Surber <douglas.surber at oracle.com>
> wrote:
> >
> > Thanks for the correction with regard to onNext.
> >
> >> On Oct 11, 2017, at 3:54 PM, Dávid Karnok <akarnokd at gmail.com <mailto:
> akarnokd at gmail.com>> wrote:
> >>
> >>> Calling Subscription.onNext with any
> >>> * Operation that is not a member of this OperationGroup, that is
> >>> * was not created by calling one of the Operation factory methods on
> this
> >>> * OperationGroup, will throw IllegalArgumentException.
> >>
> >> Processor.onNext must not throw, therefore an invalid onNext item
> should cancel the Subscription and signal onError to the Publisher side of
> the Processor.
> >>
> >>> Still thinking about how to publish rows.
> >>
> >> In rxjava-jdbc, ResultSet is used as a lambda parameter and the
> instance shown to the lambda should not escape it, i.e.,
> >>
> >> <T> Flow.Publisher<T> convert(Function<Row, T> mapper);
> >>
> >> 2017-10-11 22:05 GMT+02:00 Douglas Surber <douglas.surber at oracle.com <
> mailto:douglas.surber at oracle.com> <mailto:douglas.surber at oracle.com
> <mailto:douglas.surber at oracle.com>>>:
> >> I have found what at first glance appears to be a non-hacky way to use
> Flow to channel Operations to an OperationGroup. It’s not perfect. The
> limitation that published Operations must be member Operations of the
> Operation group is unfortunate but absolutely necessary. Otherwise I think
> it is ok.
> >>
> >> Delete Connection.onSubscribe.
> >>
> >> Add the following to OperationGroup:
> >>
> >> /**
> >> * Returns a Flow.Processor that subscribes to a sequence of Operations
> and
> >> * publishes a sequence of corresponding Submissions. The Operations must
>
> >> * be members of this OperationGroup. Calling Subscription.onNext with
> any
> >> * Operation that is not a member of this OperationGroup, that is
> >> * was not created by calling one of the Operation factory methods on
> this
> >> * OperationGroup, will throw IllegalArgumentException. The method
> >> * Subscription.onNext will call submit on each Operation it is passed
> >> * and publish the resulting Submission. Since an Operation can only be
> >> * submitted once, submitting an Operation and calling onNext with that
> >> * submitted Operation will throw IllegalStateException.
> >> *
> >> * Each call to this method returns a new Flow.processor. The Submissions
>
> >> * published to each Processor are exactly those generated by calling
> >> * submit on the Operations passed as arguments to onNext on the same
> Processor.
> >> *
> >> * Note: If any Operation is submitted directly, that is by calling
> submit
> >> * rather than passing it to onNext, the Submission returned by the
> submit
> >> * call will not be published.
> >> *
> >> * @return a Flow.Processor that accepts Operations and generates
> Submissions
> >> */
> >> public Flow.Processor<Operation<T>, Submission<T>> operationProcessor();
>
> >>
> >> Comments?
> >>
> >> Still thinking about how to publish rows.
> >>
> >>
> >> Douglas
> >>
> >>
> >>
> >> --
> >> Best regards,
> >> David Karnok
>
>
>


More information about the jdbc-spec-discuss mailing list