Flow.Processor for Operation submission

Dávid Karnok akarnokd at gmail.com
Fri Oct 13 08:32:56 UTC 2017


>  this Operation is completed with null

Not sure where this null would go as onNext must not be called with null,
rule §2.13: https://github.com/reactive-streams/reactive-
streams-jvm/blob/master/README.md#2.13

2017-10-12 23:59 GMT+02:00 Douglas Surber <douglas.surber at oracle.com>:

> Viktor,
>
> Thanks. This helps a lot.
>
> Here is a draft of OperationGroup.operationProcessor. This is a very
> rough draft.
>
>   /**
>    * Returns a Flow.Processor that subscribes to a sequence of Operations
> and
>    * produces a sequence of corresponding Submissions. The Operations must
> be
>    * members of this OperationGroup. Calling Subscription.onNext with any
>    * Operation that is not a member of this OperationGroup, that is was not
>    * created by calling one of the Operation factory methods on this
>    * OperationGroup, will cause the Subscription to be canceled and call
>    * Subscriber.onError with IllegalArgumentException. The method
>    * Subscription.onNext will call submit on each Operation it is passed
> and
>    * publish the resulting Submission. Since an Operation can only be
> submitted
>    * once, submitting an Operation and calling onNext with that submitted
>    * Operation will cause the Subscription to be canceled and
> Subscriber.onError
>    * to be called with IllegalStateException. The Processor does not retain
>    * Submissions to produce to a subsequently attached Subscriber.
>    *
>    * If there is no Subscriber to the Processor, the Processor will request
>    * Operations as appropriate. If there is a Subscriber to the Processor,
> the
>    * Processor will request Operations no faster than the Subscriber
> requests
>    * Submissions.
>    *
>    * Each call to this method returns a new Flow.processor. The Submissions
>    * published to each Processor are exactly those generated by calling
> submit
>    * on the Operations passed as arguments to onNext on the same Processor.
>    *
>    * If there are multiple active operationProcessors on a single
> Connection
>    * the Connection will as much as possible keep the demand on all
> Publishers
>    * the same. In particular, if the Connection can accept more Operations
> then
>    * the demand on all Publishers should be greater than 0.
>    *
>    * Note: If any Operation is submitted directly, that is by calling
> submit
>    * rather than passing it to onNext, the Submission returned by the
> submit
>    * call will not be published.
>    *
>    * @return a Flow.Processor that accepts Operations and generates
> Submissions
>    */
>   public Flow.Processor<Operation<T>, Submission<T>> operationProcessor();
>
>
> And here is a very rough draft of a RowProcessorOperation
>
> public interface RowProcessorOperation<T> extends
> ParameterizedOperation<T> {
>
>   /** DRAFT
>    * Accepts a Processor that subscribes to a sequence of Rows and
> publishes
>    * a sequence of result values. The last result value published is the
> result
>    * of the Operation.
>    *
>    * The result of this Operation is the last value passed to the onNext
> method
>    * of the Subscriber passed to rowProcessor.subscribe.If onComplete
>    * is called before any value is passed to onNext this Operation is
> completed
>    * with null. If onError is called this Operation completes exceptionally
>    * with the passed exception. If neither onComplete or onError is called
>    * this Operation does not complete.
>    *
>    * Calling Row.cancel is the same as calling Subscription.cancel on the
>    * Subscription associated with the Row publisher.
>    *
>    * @param rowProcessor
>    * @return this RowProcessorOperation
>    */
>   public RowProcessorOperation<T> rowProcessor(Flow.Processor<Result.Row,
> T> rowProcessor);
>
>   // plus lots of covariant overrides: onError, set, etc
> }
>
> I’m having some difficulty finding the language to describe the various
> Publishers, Subscribers, and Subscriptions associated with the Processor
> argument. The point is the Processor argument consumes Rows and produces
> results (maybe only one or even none). The Processor must call onError or
> onComplete on the Subscriber<T> passed to processor.subscribe. Failure to
> do so will cause the Connection to hang which is very, very bad.
>
> I don’t like RowProcessorOperation because it is too easy to end up in a
> state where the Connection is unable to make progress. If the submitted
> Processor never calls onError or onComplete on the result Subscriber the
> Operation never completes and the Connection never makes progress and never
> closes.
>
> Douglas
>
>
> > On Oct 12, 2017, at 2:25 PM, Viktor Klang <v at lightbend.com> wrote:
> >
> >
> >
> > On Thu, Oct 12, 2017 at 10:04 PM, Douglas Surber <
> douglas.surber at oracle.com <mailto:douglas.surber at oracle.com>> wrote:
> > Forgot to answer “what would a Publisher do about it”.
> >
> > Both cases are programming errors. They are not data errors or caused by
> an unexpected case. They are strictly the result of an error in program
> logic. At some level it would be reasonable to throw Error as they should
> never happen. If it does happen the app is broken and the code must change.
> So arguably there is no way to recover. The Publisher should abandon
> whatever request it is handling and try again either on the same request or
> a new one, hoping that execution does not follow the same erroneous code
> path.
> >
> > Since we're talking about Processors here it would seem completely
> appropriate for the Processor to cancel it's upstream Subscription and
> propagate an IllegalStateException downstream. This would effectively abort
> processing for the Publisher as well as propagate the exception to
> downstream dependencies, which could then clean up and exit gracefully—and
> of course ideally log or otherwise make the developer aware of the problem.
> >
> >
> > Douglas
> >
> >
> > > On Oct 12, 2017, at 12:53 PM, Douglas Surber <
> douglas.surber at oracle.com <mailto:douglas.surber at oracle.com>> wrote:
> > >
> > > There are two cases for values that would be improper to pass to
> onNext. In both cases the values are themselves proper. The error is
> passing them to onNext.
> > >
> > >  1) Passing an Operation that is a member of OperationGroup A to an
> operationProcessor for OperationGroup B.
> > >
> > >  2) Passing an Operation that has already been submitted.
> > >
> > > Code example:
> > >
> > >  void foo(Connection conn1, Connection conn2) {
> > >    String updateSql = null;
> > >    Flow.Subscription aSubscription = null;
> > >    conn1.countOperation(updateSql).submit(); // this is ok
> > >    Flow.Processor<Operation<Object>, Submission<Object>> p =
> conn1.operationProcessor();
> > >    p.onSubscribe(aSubscription);
> > >    p.onNext(conn1.countOperation(updateSql)); // this is ok
> > >    conn2.countOperation(updateSql).submit(); // this is ok
> > >    p.onNext(conn2.countOperation(updateSql)); // error case 1. Op
> created by conn2 passed to Processor for conn1.
> > >    Operation x = conn1.countOperation(updateSql);
> > >    x.submit();
> > >    p.onNext(x);  // error case 2. Op created by conn1 but already
> submitted
> > >  }
> > >
> > > Note: Connection inherits from OperationGroup.
> > >
> > > Note 2: OperationGroup is a tool for managing errors. By default if an
> Operation fails all subsequent Operations in the same OperationGroup are
> skipped, are completed exceptionally with SqlSkippedException. So you can
> use OperationGroups to control which Operations are skipped and hence when
> execution resumes. OperationGroups have other capabilities. OperationGroups
> are Operation factories. Since Connections need to be Operation factories
> Connections are OperationGroups. There are more or less reasonable use
> cases for using all of the capabilities of OperationGroup on Connection.
> > >
> > > Note 3: A Connection close Operation is never skipped.
> > >
> > > Douglas
> > >
> > >> On Oct 12, 2017, at 11:20 AM, Viktor Klang <v at lightbend.com <mailto:
> v at lightbend.com>> wrote:
> > >>
> > >> Hi Douglas,
> > >>
> > >> Would you mind elaborating a bit about these improper values? I.e.
> how do they manifest, and what would a Publisher do about it?
> > >>
> > >> (I'm super excited about the prospect of having an async JDBC!)
> > >>
> > >> Best regards,
> > >> √
> > >>
> > >> On Oct 12, 2017 17:56, "Douglas Surber" <douglas.surber at oracle.com
> <mailto:douglas.surber at oracle.com> <mailto:douglas.surber at oracle.com
> <mailto:douglas.surber at oracle.com>>> wrote:
> > >> There are two error cases I need to specify. Each error case occurs
> when the Publisher calls Subscriber.onNext with an improper value. So far
> as I can comprehend in both cases the Subscriber calls Subscription.cancel.
> That’s all well and good but I do not see any way for the Subscriber to
> report which of the two cases provoked the call to cancel. What am I
> missing?
> > >>
> > >> Further it is at least locally reasonable that the Subscriber
> tolerate these improper values, reporting them as errors but allowing the
> Publisher to send more values anyway. If we did choose to do this I don’t
> see any way for the Subscriber to inform the Publisher that anything
> untoward happened at all.
> > >>
> > >> Douglas
> > >>
> > >>
> > >>> On Oct 12, 2017, at 8:20 AM, Konrad “ktoso” Malawski <
> ktosopl at gmail.com <mailto:ktosopl at gmail.com> <mailto:ktosopl at gmail.com
> <mailto:ktosopl at gmail.com>>> wrote:
> > >>>
> > >>> Hi Douglas, David,
> > >>>
> > >>> After reading the JavaDoc it’s not clear to me that onNext cannot
> throw. The JavaDoc says “the resulting behavior is not guaranteed” but it
> doesn’t say that onNext cannot throw. Can you point to something that has
> more detail?
> > >>>
> > >>>
> > >>> The full set of rules of Reactive Streams, and thus also, Flow.*
> semantics is
> > >>> available on the reactive streams site here:
> https://github.com/reactive-streams/reactive-streams-jvm#specification <
> https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_reactive-
> 2Dstreams_reactive-2Dstreams-2Djvm-23specification&d=DwMFaQ&c=
> RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_
> sNBRUO0uuqD6z6ltcMO-LbA&m=GQq3mobCwlVytQ06n_vc_nsgONZ7omvbH7VpkhhB9jE&s=
> U5hSy3n6qC2rPhr9I9lGDbbqiXdslFVBUushU50xJ1g&e=> <https://urldefense.
> proofpoint.com/v2/url?u=https-3A__github.com_reactive-
> 2Dstreams_reactive-2Dstreams-2Djvm-23specification&d=DwMFaQ&c=
> RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_
> sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=
> 8PKpeAVkkcN6hA6GJ2zcEVqI3ohcI5138zxnEdjgetA&e= <https://urldefense.
> proofpoint.com/v2/url?u=https-3A__github.com_reactive-
> 2Dstreams_reactive-2Dstreams-2Djvm-23specification&d=DwMFaQ&c=
> RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_
> sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=
> 8PKpeAVkkcN6hA6GJ2zcEVqI3ohcI5138zxnEdjgetA&e=>>
> > >>>
> > >>> We (specifically; Viktor, initiator of RS, CCed here) have been in
> talks with Doug Lea to include the actual specification rules
> > >>> in JavaDoc of Flow as we felt this would happen (that people would
> not realize there’s a full well-defined spec behind it).
> > >>> I hope we’ll be able to get that done, as otherwise many people will
> be left unaware of the rules that govern Flow implementations.
> > >>> There also is a TCK available <https://urldefense.
> proofpoint.com/v2/url?u=https-3A__github.com_reactive-
> 2Dstreams_reactive-2Dstreams-2Djvm_tree_master_tck&d=DwMFaQ&c=
> RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_
> sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=
> mGgS1KAtK30Xw6rPq4K6cOFAL4UN3rz6mI8wLqQiUUA&e= <https://urldefense.
> proofpoint.com/v2/url?u=https-3A__github.com_reactive-
> 2Dstreams_reactive-2Dstreams-2Djvm_tree_master_tck&d=DwMFaQ&c=
> RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_
> sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=
> mGgS1KAtK30Xw6rPq4K6cOFAL4UN3rz6mI8wLqQiUUA&e=>> as testng tests. I’ve
> been slow to update it <https://urldefense.proofpoint.com/v2/url?u=https-
> 3A__github.com_reactive-2Dstreams_reactive-2Dstreams-
> 2Djvm_pull_398&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=
> ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_
> EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=xC07YG9iqW5h0mXN1exu72kW4TUTNr
> _YmhxG94BMKqw&e= <https://urldefense.proofpoint.com/v2/url?u=https-
> 3A__github.com_reactive-2Dstreams_reactive-2Dstreams-
> 2Djvm_pull_398&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=
> ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_
> EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=xC07YG9iqW5h0mXN1exu72kW4TUTNr
> _YmhxG94BMKqw&e=>> to directly work on Flow.* types – very sorry about
> that, and will try to get it completed very soon so Flow impls can use it
> directly.
> > >>>
> > >>> Glad to see many RS contributors joining the discussion here, and
> hopefully we’ll be able to help out making efficient use of Flow in at
> least some parts of the API.
> > >>>
> > >>>
> > >>> Similar APIs, that might be useful as examples in the discussion (I
> hope that’s helpful):
> > >>>
> > >>> I have yet to read up on details about what Operation/OperationGroup/Submission
> etc really mean semantically,
> > >>> but if you want to model a back-pressured stream that back-pressures
> the incoming operations and emits Submissions (are they like “results”?
> I’ll read up more),
> > >>> then modeling it as a Processor is indeed a good idea. This is how
> we have modeled the back-pressured HTTP-client in Akka HTTP, the type is
> (translated):
> > >>> Processor<HttpRequest, HttpResponse>, which may be backed by
> multiple connections or by a single one internally, so the pulling in
> requests is handled appropriately.
> > >>> Looking at the API proposed here it seems that’s what you’re after;
> so I can confirm we have such API in the wild since some years and it
> functions well.
> > >>> The API I talk about is here https://github.com/akka/akka-
> http/blob/master/akka-http-core/src/main/scala/akka/http/
> javadsl/Http.scala#L380 <https://urldefense.proofpoint.com/v2/url?u=https-
> 3A__github.com_akka_akka-2Dhttp_blob_master_akka-
> 2Dhttp-2Dcore_src_main_scala_akka_http_javadsl_Http.scala-
> 23L380&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=
> ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=GQq3mobCwlVytQ06n_vc_
> nsgONZ7omvbH7VpkhhB9jE&s=tXyJy5d2KOCmopTbLD-3A4TinXEBbfIcrtfSfAId2kg&e=> <
> https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_akka_akka-
> 2Dhttp_blob_master_akka-2Dhttp-2Dcore_src_main_scala_
> akka_http_javadsl_Http.scala-23L380&d=DwMFaQ&c=
> RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_
> sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=
> Ji0jhIlx0uZcHaIXJmiv3P03oR_O4bq2HzBL1FprLoQ&e= <https://urldefense.
> proofpoint.com/v2/url?u=https-3A__github.com_akka_akka-
> 2Dhttp_blob_master_akka-2Dhttp-2Dcore_src_main_scala_
> akka_http_javadsl_Http.scala-23L380&d=DwMFaQ&c=
> RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_
> sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=
> Ji0jhIlx0uZcHaIXJmiv3P03oR_O4bq2HzBL1FprLoQ&e=>> (assume Flow is exactly
> like a Processor)
> > >>>
> > >>> Another example of reactive-streams with JDBC in action is Slick,
> and it’s wrappers in Alpakka, here’s examples:
> https://developer.lightbend.com/docs/alpakka/current/slick.html <
> https://urldefense.proofpoint.com/v2/url?u=https-
> 3A__developer.lightbend.com_docs_alpakka_current_slick.html&d=DwMFaQ&c=
> RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_
> sNBRUO0uuqD6z6ltcMO-LbA&m=GQq3mobCwlVytQ06n_vc_nsgONZ7omvbH7VpkhhB9jE&s=_
> DOxo6WFHb8G1OxQ07m3svqcaKCDR_8R8_ZSQG3v0gI&e=> <https://urldefense.
> proofpoint.com/v2/url?u=https-3A__developer.lightbend.com_
> docs_alpakka_current_slick.html&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5Y
> TpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_
> EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=QowbfwYtawd90nnK4Ubzra0xVXUigK
> lX4jX5YCgSEds&e= <https://urldefense.proofpoint.com/v2/url?u=https-
> 3A__developer.lightbend.com_docs_alpakka_current_slick.html&d=DwMFaQ&c=
> RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_
> sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=
> QowbfwYtawd90nnK4Ubzra0xVXUigKlX4jX5YCgSEds&e=>>
> > >>> It’s pretty simple, you can click on the Java tab for inspiration
> (Sink == Subscriber, Source == Publisher, Flow == Processor), and basically
> the simplest version of an RS integration.
> > >>>
> > >>> Hope this helps,
> > >>> happy to help with further design or general reactive-streams
> questions (we’re trying to figure out how to help here the most efficiently
> currently).
> > >>>
> > >>>
> > >>> --
> > >>> Cheers,
> > >>> Konrad 'ktoso <https://urldefense.proofpoint.com/v2/url?u=http-
> 3A__kto.so&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=
> ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_
> EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=x4S1kDPFg6JwQ9mRlkvN22YRVOpNwi
> B-5BgAITRIsv8&e= <https://urldefense.proofpoint.com/v2/url?u=http-
> 3A__kto.so&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=
> ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_
> EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=x4S1kDPFg6JwQ9mRlkvN22YRVOpNwi
> B-5BgAITRIsv8&e=>>' Malawski
> > >>> Akka <https://urldefense.proofpoint.com/v2/url?u=http-
> 3A__akka.io_&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=
> ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_
> EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=5b5-GC2k85BKW51yEAecgwQYujsGU-
> PGmIBOts8xY6w&e= <https://urldefense.proofpoint.com/v2/url?u=http-
> 3A__akka.io_&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=
> ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_
> EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=5b5-GC2k85BKW51yEAecgwQYujsGU-
> PGmIBOts8xY6w&e=>> @ Lightbend <https://urldefense.
> proofpoint.com/v2/url?u=http-3A__lightbend.com_&d=DwMFaQ&c=
> RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_
> sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDU
> F6ks&s=CBQKBaKMiqEV5R_eUsO3lULxZXmBerKDlw0Gsyw2pG0&e= <https://urldefense.
> proofpoint.com/v2/url?u=http-3A__lightbend.com_&d=DwMFaQ&c=
> RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_
> sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDU
> F6ks&s=CBQKBaKMiqEV5R_eUsO3lULxZXmBerKDlw0Gsyw2pG0&e=>>
> > >>> On October 12, 2017 at 23:54:21, Douglas Surber (
> douglas.surber at oracle.com <mailto:douglas.surber at oracle.com> <mailto:
> douglas.surber at oracle.com <mailto:douglas.surber at oracle.com>>) wrote:
> > >>>
> > >>>> After reading the JavaDoc it’s not clear to me that onNext cannot
> throw. The JavaDoc says “the resulting behavior is not guaranteed” but it
> doesn’t say that onNext cannot throw. Can you point to something that has
> more detail?
> > >>>>
> > >>>> Douglas
> > >>>>
> > >>>>
> > >>>>> On Oct 12, 2017, at 7:33 AM, Douglas Surber <
> douglas.surber at oracle.com <mailto:douglas.surber at oracle.com> <mailto:
> douglas.surber at oracle.com <mailto:douglas.surber at oracle.com>>> wrote:
> > >>>>>
> > >>>>> Thanks for the correction with regard to onNext.
> > >>>>>
> > >>>>>> On Oct 11, 2017, at 3:54 PM, Dávid Karnok <akarnokd at gmail.com
> <mailto:akarnokd at gmail.com> <mailto:akarnokd at gmail.com <mailto:
> akarnokd at gmail.com>> <mailto:akarnokd at gmail.com <mailto:akarnokd at gmail.com
> ><mailto:akarnokd at gmail.com <mailto:akarnokd at gmail.com>>>> wrote:
> > >>>>>>
> > >>>>>>> Calling Subscription.onNext with any
> > >>>>>>> * Operation that is not a member of this OperationGroup, that is
> > >>>>>>> * was not created by calling one of the Operation factory
> methods on this
> > >>>>>>> * OperationGroup, will throw IllegalArgumentException.
> > >>>>>>
> > >>>>>> Processor.onNext must not throw, therefore an invalid onNext item
> should cancel the Subscription and signal onError to the Publisher side of
> the Processor.
> > >>>>>>
> > >>>>>>> Still thinking about how to publish rows.
> > >>>>>>
> > >>>>>> In rxjava-jdbc, ResultSet is used as a lambda parameter and the
> instance shown to the lambda should not escape it, i.e.,
> > >>>>>>
> > >>>>>> <T> Flow.Publisher<T> convert(Function<Row, T> mapper);
> > >>>>>>
> > >>>>>> 2017-10-11 22:05 GMT+02:00 Douglas Surber <
> douglas.surber at oracle.com <mailto:douglas.surber at oracle.com> <mailto:
> douglas.surber at oracle.com <mailto:douglas.surber at oracle.com>> <mailto:
> douglas.surber at oracle.com <mailto:douglas.surber at oracle.com><mailto:
> douglas.surber at oracle.com <mailto:douglas.surber at oracle.com>>> <mailto:
> douglas.surber at oracle.com <mailto:douglas.surber at oracle.com> <mailto:
> douglas.surber at oracle.com <mailto:douglas.surber at oracle.com>> <mailto:
> douglas.surber at oracle.com <mailto:douglas.surber at oracle.com><mailto:
> douglas.surber at oracle.com <mailto:douglas.surber at oracle.com>>>>>:
> > >>>>>> I have found what at first glance appears to be a non-hacky way
> to use Flow to channel Operations to an OperationGroup. It’s not perfect.
> The limitation that published Operations must be member Operations of the
> Operation group is unfortunate but absolutely necessary. Otherwise I think
> it is ok.
> > >>>>>>
> > >>>>>> Delete Connection.onSubscribe.
> > >>>>>>
> > >>>>>> Add the following to OperationGroup:
> > >>>>>>
> > >>>>>> /**
> > >>>>>> * Returns a Flow.Processor that subscribes to a sequence of
> Operations and
> > >>>>>> * publishes a sequence of corresponding Submissions. The
> Operations must
> > >>>>>> * be members of this OperationGroup. Calling Subscription.onNext
> with any
> > >>>>>> * Operation that is not a member of this OperationGroup, that is
> > >>>>>> * was not created by calling one of the Operation factory methods
> on this
> > >>>>>> * OperationGroup, will throw IllegalArgumentException. The method
> > >>>>>> * Subscription.onNext will call submit on each Operation it is
> passed
> > >>>>>> * and publish the resulting Submission. Since an Operation can
> only be
> > >>>>>> * submitted once, submitting an Operation and calling onNext with
> that
> > >>>>>> * submitted Operation will throw IllegalStateException.
> > >>>>>> *
> > >>>>>> * Each call to this method returns a new Flow.processor. The
> Submissions
> > >>>>>> * published to each Processor are exactly those generated by
> calling
> > >>>>>> * submit on the Operations passed as arguments to onNext on the
> same Processor.
> > >>>>>> *
> > >>>>>> * Note: If any Operation is submitted directly, that is by
> calling submit
> > >>>>>> * rather than passing it to onNext, the Submission returned by
> the submit
> > >>>>>> * call will not be published.
> > >>>>>> *
> > >>>>>> * @return a Flow.Processor that accepts Operations and generates
> Submissions
> > >>>>>> */
> > >>>>>> public Flow.Processor<Operation<T>, Submission<T>>
> operationProcessor();
> > >>>>>>
> > >>>>>> Comments?
> > >>>>>>
> > >>>>>> Still thinking about how to publish rows.
> > >>>>>>
> > >>>>>>
> > >>>>>> Douglas
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> --
> > >>>>>> Best regards,
> > >>>>>> David Karnok
> > >>
> > >
> >
> >
> >
> >
> > --
> > Cheers,
> > √
> >
> > ——————
> > Viktor Klang
> > Deputy CTO
> > Lightbend, Inc.
>
>


-- 
Best regards,
David Karnok


More information about the jdbc-spec-discuss mailing list