Flow.Processor for Operation submission

Douglas Surber douglas.surber at oracle.com
Fri Oct 13 14:16:45 UTC 2017


All Operations have a result value. The type parameter T to Operation is the type of that Operation’s result. This result is made available via Submission.toCompletableFuture. None of this has anything to do with Flow.

Douglas

> On Oct 13, 2017, at 1:32 AM, Dávid Karnok <akarnokd at gmail.com> wrote:
> 
> >  this Operation is completed with null
> 
> Not sure where this null would go as onNext must not be called with null, rule §2.13: https://github.com/reactive-streams/reactive-streams-jvm/blob/master/README.md#2.13 <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_reactive-2Dstreams_reactive-2Dstreams-2Djvm_blob_master_README.md-232.13&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=oePTMeCSKhFcGmsBuATg4fnYIi_sTP95MfF4tF3hy1k&s=5MUyTttn-Yo5dwqPY6jVaq5THLIzHoZkn81IqwAvF6Y&e=>
> 
> 2017-10-12 23:59 GMT+02:00 Douglas Surber <douglas.surber at oracle.com <mailto:douglas.surber at oracle.com>>:
> Viktor,
> 
> Thanks. This helps a lot.
> 
> Here is a draft of OperationGroup.operationProcessor. This is a very rough draft.
> 
>   /**
>    * Returns a Flow.Processor that subscribes to a sequence of Operations and
>    * produces a sequence of corresponding Submissions. The Operations must be
>    * members of this OperationGroup. Calling Subscription.onNext with any
>    * Operation that is not a member of this OperationGroup, that is was not
>    * created by calling one of the Operation factory methods on this
>    * OperationGroup, will cause the Subscription to be canceled and call
>    * Subscriber.onError with IllegalArgumentException. The method
>    * Subscription.onNext will call submit on each Operation it is passed and
>    * publish the resulting Submission. Since an Operation can only be submitted
>    * once, submitting an Operation and calling onNext with that submitted
>    * Operation will cause the Subscription to be canceled and Subscriber.onError
>    * to be called with IllegalStateException. The Processor does not retain
>    * Submissions to produce to a subsequently attached Subscriber.
>    *
>    * If there is no Subscriber to the Processor, the Processor will request
>    * Operations as appropriate. If there is a Subscriber to the Processor, the
>    * Processor will request Operations no faster than the Subscriber requests
>    * Submissions.
>    *
>    * Each call to this method returns a new Flow.processor. The Submissions
>    * published to each Processor are exactly those generated by calling submit
>    * on the Operations passed as arguments to onNext on the same Processor.
>    *
>    * If there are multiple active operationProcessors on a single Connection
>    * the Connection will as much as possible keep the demand on all Publishers
>    * the same. In particular, if the Connection can accept more Operations then
>    * the demand on all Publishers should be greater than 0.
>    *
>    * Note: If any Operation is submitted directly, that is by calling submit
>    * rather than passing it to onNext, the Submission returned by the submit
>    * call will not be published.
>    *
>    * @return a Flow.Processor that accepts Operations and generates Submissions
>    */
>   public Flow.Processor<Operation<T>, Submission<T>> operationProcessor();
> 
> 
> And here is a very rough draft of a RowProcessorOperation
> 
> public interface RowProcessorOperation<T> extends ParameterizedOperation<T> {
> 
>   /** DRAFT
>    * Accepts a Processor that subscribes to a sequence of Rows and publishes
>    * a sequence of result values. The last result value published is the result
>    * of the Operation.
>    *
>    * The result of this Operation is the last value passed to the onNext method
>    * of the Subscriber passed to rowProcessor.subscribe.If onComplete
>    * is called before any value is passed to onNext this Operation is completed
>    * with null. If onError is called this Operation completes exceptionally
>    * with the passed exception. If neither onComplete or onError is called
>    * this Operation does not complete.
>    *
>    * Calling Row.cancel is the same as calling Subscription.cancel on the
>    * Subscription associated with the Row publisher.
>    *
>    * @param rowProcessor
>    * @return this RowProcessorOperation
>    */
>   public RowProcessorOperation<T> rowProcessor(Flow.Processor<Result.Row, T> rowProcessor);
> 
>   // plus lots of covariant overrides: onError, set, etc
> }
> 
> I’m having some difficulty finding the language to describe the various Publishers, Subscribers, and Subscriptions associated with the Processor argument. The point is the Processor argument consumes Rows and produces results (maybe only one or even none). The Processor must call onError or onComplete on the Subscriber<T> passed to processor.subscribe. Failure to do so will cause the Connection to hang which is very, very bad.
> 
> I don’t like RowProcessorOperation because it is too easy to end up in a state where the Connection is unable to make progress. If the submitted Processor never calls onError or onComplete on the result Subscriber the Operation never completes and the Connection never makes progress and never closes.
> 
> Douglas
> 
> 
> > On Oct 12, 2017, at 2:25 PM, Viktor Klang <v at lightbend.com <mailto:v at lightbend.com>> wrote:
> >
> >
> >
> > On Thu, Oct 12, 2017 at 10:04 PM, Douglas Surber <douglas.surber at oracle.com <mailto:douglas.surber at oracle.com> <mailto:douglas.surber at oracle.com <mailto:douglas.surber at oracle.com>>> wrote:
> > Forgot to answer “what would a Publisher do about it”.
> >
> > Both cases are programming errors. They are not data errors or caused by an unexpected case. They are strictly the result of an error in program logic. At some level it would be reasonable to throw Error as they should never happen. If it does happen the app is broken and the code must change. So arguably there is no way to recover. The Publisher should abandon whatever request it is handling and try again either on the same request or a new one, hoping that execution does not follow the same erroneous code path.
> >
> > Since we're talking about Processors here it would seem completely appropriate for the Processor to cancel it's upstream Subscription and propagate an IllegalStateException downstream. This would effectively abort processing for the Publisher as well as propagate the exception to downstream dependencies, which could then clean up and exit gracefully—and of course ideally log or otherwise make the developer aware of the problem.
> >
> >
> > Douglas
> >
> >
> > > On Oct 12, 2017, at 12:53 PM, Douglas Surber <douglas.surber at oracle.com <mailto:douglas.surber at oracle.com> <mailto:douglas.surber at oracle.com <mailto:douglas.surber at oracle.com>>> wrote:
> > >
> > > There are two cases for values that would be improper to pass to onNext. In both cases the values are themselves proper. The error is passing them to onNext.
> > >
> > >  1) Passing an Operation that is a member of OperationGroup A to an operationProcessor for OperationGroup B.
> > >
> > >  2) Passing an Operation that has already been submitted.
> > >
> > > Code example:
> > >
> > >  void foo(Connection conn1, Connection conn2) {
> > >    String updateSql = null;
> > >    Flow.Subscription aSubscription = null;
> > >    conn1.countOperation(updateSql).submit(); // this is ok
> > >    Flow.Processor<Operation<Object>, Submission<Object>> p = conn1.operationProcessor();
> > >    p.onSubscribe(aSubscription);
> > >    p.onNext(conn1.countOperation(updateSql)); // this is ok
> > >    conn2.countOperation(updateSql).submit(); // this is ok
> > >    p.onNext(conn2.countOperation(updateSql)); // error case 1. Op created by conn2 passed to Processor for conn1.
> > >    Operation x = conn1.countOperation(updateSql);
> > >    x.submit();
> > >    p.onNext(x);  // error case 2. Op created by conn1 but already submitted
> > >  }
> > >
> > > Note: Connection inherits from OperationGroup.
> > >
> > > Note 2: OperationGroup is a tool for managing errors. By default if an Operation fails all subsequent Operations in the same OperationGroup are skipped, are completed exceptionally with SqlSkippedException. So you can use OperationGroups to control which Operations are skipped and hence when execution resumes. OperationGroups have other capabilities. OperationGroups are Operation factories. Since Connections need to be Operation factories Connections are OperationGroups. There are more or less reasonable use cases for using all of the capabilities of OperationGroup on Connection.
> > >
> > > Note 3: A Connection close Operation is never skipped.
> > >
> > > Douglas
> > >
> > >> On Oct 12, 2017, at 11:20 AM, Viktor Klang <v at lightbend.com <mailto:v at lightbend.com> <mailto:v at lightbend.com <mailto:v at lightbend.com>>> wrote:
> > >>
> > >> Hi Douglas,
> > >>
> > >> Would you mind elaborating a bit about these improper values? I.e. how do they manifest, and what would a Publisher do about it?
> > >>
> > >> (I'm super excited about the prospect of having an async JDBC!)
> > >>
> > >> Best regards,
> > >> √
> > >>
> > >> On Oct 12, 2017 17:56, "Douglas Surber" <douglas.surber at oracle.com <mailto:douglas.surber at oracle.com> <mailto:douglas.surber at oracle.com <mailto:douglas.surber at oracle.com>> <mailto:douglas.surber at oracle.com <mailto:douglas.surber at oracle.com> <mailto:douglas.surber at oracle.com <mailto:douglas.surber at oracle.com>>>> wrote:
> > >> There are two error cases I need to specify. Each error case occurs when the Publisher calls Subscriber.onNext with an improper value. So far as I can comprehend in both cases the Subscriber calls Subscription.cancel. That’s all well and good but I do not see any way for the Subscriber to report which of the two cases provoked the call to cancel. What am I missing?
> > >>
> > >> Further it is at least locally reasonable that the Subscriber tolerate these improper values, reporting them as errors but allowing the Publisher to send more values anyway. If we did choose to do this I don’t see any way for the Subscriber to inform the Publisher that anything untoward happened at all.
> > >>
> > >> Douglas
> > >>
> > >>
> > >>> On Oct 12, 2017, at 8:20 AM, Konrad “ktoso” Malawski <ktosopl at gmail.com <mailto:ktosopl at gmail.com> <mailto:ktosopl at gmail.com <mailto:ktosopl at gmail.com>> <mailto:ktosopl at gmail.com <mailto:ktosopl at gmail.com> <mailto:ktosopl at gmail.com <mailto:ktosopl at gmail.com>>>> wrote:
> > >>>
> > >>> Hi Douglas, David,
> > >>>
> > >>> After reading the JavaDoc it’s not clear to me that onNext cannot throw. The JavaDoc says “the resulting behavior is not guaranteed” but it doesn’t say that onNext cannot throw. Can you point to something that has more detail?
> > >>>
> > >>>
> > >>> The full set of rules of Reactive Streams, and thus also, Flow.* semantics is
> > >>> available on the reactive streams site here: https://github.com/reactive-streams/reactive-streams-jvm#specification <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_reactive-2Dstreams_reactive-2Dstreams-2Djvm-23specification&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=oePTMeCSKhFcGmsBuATg4fnYIi_sTP95MfF4tF3hy1k&s=M0t2KOW5iKWSToFai4lBphQYnBQoVd7wWzcHHlhiFL4&e=> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_reactive-2Dstreams_reactive-2Dstreams-2Djvm-23specification&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=GQq3mobCwlVytQ06n_vc_nsgONZ7omvbH7VpkhhB9jE&s=U5hSy3n6qC2rPhr9I9lGDbbqiXdslFVBUushU50xJ1g&e= <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_reactive-2Dstreams_reactive-2Dstreams-2Djvm-23specification&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=GQq3mobCwlVytQ06n_vc_nsgONZ7omvbH7VpkhhB9jE&s=U5hSy3n6qC2rPhr9I9lGDbbqiXdslFVBUushU50xJ1g&e=>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_reactive-2Dstreams_reactive-2Dstreams-2Djvm-23specification&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=8PKpeAVkkcN6hA6GJ2zcEVqI3ohcI5138zxnEdjgetA&e= <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_reactive-2Dstreams_reactive-2Dstreams-2Djvm-23specification&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=8PKpeAVkkcN6hA6GJ2zcEVqI3ohcI5138zxnEdjgetA&e=> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_reactive-2Dstreams_reactive-2Dstreams-2Djvm-23specification&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=8PKpeAVkkcN6hA6GJ2zcEVqI3ohcI5138zxnEdjgetA&e= <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_reactive-2Dstreams_reactive-2Dstreams-2Djvm-23specification&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=8PKpeAVkkcN6hA6GJ2zcEVqI3ohcI5138zxnEdjgetA&e=>>>
> > >>>
> > >>> We (specifically; Viktor, initiator of RS, CCed here) have been in talks with Doug Lea to include the actual specification rules
> > >>> in JavaDoc of Flow as we felt this would happen (that people would not realize there’s a full well-defined spec behind it).
> > >>> I hope we’ll be able to get that done, as otherwise many people will be left unaware of the rules that govern Flow implementations.
> > >>> There also is a TCK available <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_reactive-2Dstreams_reactive-2Dstreams-2Djvm_tree_master_tck&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=mGgS1KAtK30Xw6rPq4K6cOFAL4UN3rz6mI8wLqQiUUA&e= <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_reactive-2Dstreams_reactive-2Dstreams-2Djvm_tree_master_tck&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=mGgS1KAtK30Xw6rPq4K6cOFAL4UN3rz6mI8wLqQiUUA&e=> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_reactive-2Dstreams_reactive-2Dstreams-2Djvm_tree_master_tck&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=mGgS1KAtK30Xw6rPq4K6cOFAL4UN3rz6mI8wLqQiUUA&e= <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_reactive-2Dstreams_reactive-2Dstreams-2Djvm_tree_master_tck&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=mGgS1KAtK30Xw6rPq4K6cOFAL4UN3rz6mI8wLqQiUUA&e=>>> as testng tests. I’ve been slow to update it <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_reactive-2Dstreams_reactive-2Dstreams-2Djvm_pull_398&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=xC07YG9iqW5h0mXN1exu72kW4TUTNr_YmhxG94BMKqw&e= <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_reactive-2Dstreams_reactive-2Dstreams-2Djvm_pull_398&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=xC07YG9iqW5h0mXN1exu72kW4TUTNr_YmhxG94BMKqw&e=> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_reactive-2Dstreams_reactive-2Dstreams-2Djvm_pull_398&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=xC07YG9iqW5h0mXN1exu72kW4TUTNr_YmhxG94BMKqw&e= <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_reactive-2Dstreams_reactive-2Dstreams-2Djvm_pull_398&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=xC07YG9iqW5h0mXN1exu72kW4TUTNr_YmhxG94BMKqw&e=>>> to directly work on Flow.* types – very sorry about that, and will try to get it completed very soon so Flow impls can use it directly.
> > >>>
> > >>> Glad to see many RS contributors joining the discussion here, and hopefully we’ll be able to help out making efficient use of Flow in at least some parts of the API.
> > >>>
> > >>>
> > >>> Similar APIs, that might be useful as examples in the discussion (I hope that’s helpful):
> > >>>
> > >>> I have yet to read up on details about what Operation/OperationGroup/Submission etc really mean semantically,
> > >>> but if you want to model a back-pressured stream that back-pressures the incoming operations and emits Submissions (are they like “results”? I’ll read up more),
> > >>> then modeling it as a Processor is indeed a good idea. This is how we have modeled the back-pressured HTTP-client in Akka HTTP, the type is (translated):
> > >>> Processor<HttpRequest, HttpResponse>, which may be backed by multiple connections or by a single one internally, so the pulling in requests is handled appropriately.
> > >>> Looking at the API proposed here it seems that’s what you’re after; so I can confirm we have such API in the wild since some years and it functions well.
> > >>> The API I talk about is here https://github.com/akka/akka-http/blob/master/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala#L380 <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_akka_akka-2Dhttp_blob_master_akka-2Dhttp-2Dcore_src_main_scala_akka_http_javadsl_Http.scala-23L380&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=oePTMeCSKhFcGmsBuATg4fnYIi_sTP95MfF4tF3hy1k&s=I45R3yRR1ZNchuoSjtO1_Wu-0itV5n6uxjCp4cVP35c&e=> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_akka_akka-2Dhttp_blob_master_akka-2Dhttp-2Dcore_src_main_scala_akka_http_javadsl_Http.scala-23L380&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=GQq3mobCwlVytQ06n_vc_nsgONZ7omvbH7VpkhhB9jE&s=tXyJy5d2KOCmopTbLD-3A4TinXEBbfIcrtfSfAId2kg&e= <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_akka_akka-2Dhttp_blob_master_akka-2Dhttp-2Dcore_src_main_scala_akka_http_javadsl_Http.scala-23L380&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=GQq3mobCwlVytQ06n_vc_nsgONZ7omvbH7VpkhhB9jE&s=tXyJy5d2KOCmopTbLD-3A4TinXEBbfIcrtfSfAId2kg&e=>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_akka_akka-2Dhttp_blob_master_akka-2Dhttp-2Dcore_src_main_scala_akka_http_javadsl_Http.scala-23L380&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=Ji0jhIlx0uZcHaIXJmiv3P03oR_O4bq2HzBL1FprLoQ&e= <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_akka_akka-2Dhttp_blob_master_akka-2Dhttp-2Dcore_src_main_scala_akka_http_javadsl_Http.scala-23L380&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=Ji0jhIlx0uZcHaIXJmiv3P03oR_O4bq2HzBL1FprLoQ&e=> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_akka_akka-2Dhttp_blob_master_akka-2Dhttp-2Dcore_src_main_scala_akka_http_javadsl_Http.scala-23L380&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=Ji0jhIlx0uZcHaIXJmiv3P03oR_O4bq2HzBL1FprLoQ&e= <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_akka_akka-2Dhttp_blob_master_akka-2Dhttp-2Dcore_src_main_scala_akka_http_javadsl_Http.scala-23L380&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=Ji0jhIlx0uZcHaIXJmiv3P03oR_O4bq2HzBL1FprLoQ&e=>>> (assume Flow is exactly like a Processor)
> > >>>
> > >>> Another example of reactive-streams with JDBC in action is Slick, and it’s wrappers in Alpakka, here’s examples: https://developer.lightbend.com/docs/alpakka/current/slick.html <https://urldefense.proofpoint.com/v2/url?u=https-3A__developer.lightbend.com_docs_alpakka_current_slick.html&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=oePTMeCSKhFcGmsBuATg4fnYIi_sTP95MfF4tF3hy1k&s=uugMyxCxBQ0xP187pCAHewMgzITyNY5p_rs48ujaGgU&e=> <https://urldefense.proofpoint.com/v2/url?u=https-3A__developer.lightbend.com_docs_alpakka_current_slick.html&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=GQq3mobCwlVytQ06n_vc_nsgONZ7omvbH7VpkhhB9jE&s=_DOxo6WFHb8G1OxQ07m3svqcaKCDR_8R8_ZSQG3v0gI&e= <https://urldefense.proofpoint.com/v2/url?u=https-3A__developer.lightbend.com_docs_alpakka_current_slick.html&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=GQq3mobCwlVytQ06n_vc_nsgONZ7omvbH7VpkhhB9jE&s=_DOxo6WFHb8G1OxQ07m3svqcaKCDR_8R8_ZSQG3v0gI&e=>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__developer.lightbend.com_docs_alpakka_current_slick.html&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=QowbfwYtawd90nnK4Ubzra0xVXUigKlX4jX5YCgSEds&e= <https://urldefense.proofpoint.com/v2/url?u=https-3A__developer.lightbend.com_docs_alpakka_current_slick.html&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=QowbfwYtawd90nnK4Ubzra0xVXUigKlX4jX5YCgSEds&e=> <https://urldefense.proofpoint.com/v2/url?u=https-3A__developer.lightbend.com_docs_alpakka_current_slick.html&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=QowbfwYtawd90nnK4Ubzra0xVXUigKlX4jX5YCgSEds&e= <https://urldefense.proofpoint.com/v2/url?u=https-3A__developer.lightbend.com_docs_alpakka_current_slick.html&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=QowbfwYtawd90nnK4Ubzra0xVXUigKlX4jX5YCgSEds&e=>>>
> > >>> It’s pretty simple, you can click on the Java tab for inspiration (Sink == Subscriber, Source == Publisher, Flow == Processor), and basically the simplest version of an RS integration.
> > >>>
> > >>> Hope this helps,
> > >>> happy to help with further design or general reactive-streams questions (we’re trying to figure out how to help here the most efficiently currently).
> > >>>
> > >>>
> > >>> --
> > >>> Cheers,
> > >>> Konrad 'ktoso <https://urldefense.proofpoint.com/v2/url?u=http-3A__kto.so&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=x4S1kDPFg6JwQ9mRlkvN22YRVOpNwiB-5BgAITRIsv8&e= <https://urldefense.proofpoint.com/v2/url?u=http-3A__kto.so&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=x4S1kDPFg6JwQ9mRlkvN22YRVOpNwiB-5BgAITRIsv8&e=> <https://urldefense.proofpoint.com/v2/url?u=http-3A__kto.so&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=x4S1kDPFg6JwQ9mRlkvN22YRVOpNwiB-5BgAITRIsv8&e= <https://urldefense.proofpoint.com/v2/url?u=http-3A__kto.so&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=x4S1kDPFg6JwQ9mRlkvN22YRVOpNwiB-5BgAITRIsv8&e=>>>' Malawski
> > >>> Akka <https://urldefense.proofpoint.com/v2/url?u=http-3A__akka.io_&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=5b5-GC2k85BKW51yEAecgwQYujsGU-PGmIBOts8xY6w&e= <https://urldefense.proofpoint.com/v2/url?u=http-3A__akka.io_&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=5b5-GC2k85BKW51yEAecgwQYujsGU-PGmIBOts8xY6w&e=> <https://urldefense.proofpoint.com/v2/url?u=http-3A__akka.io_&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=5b5-GC2k85BKW51yEAecgwQYujsGU-PGmIBOts8xY6w&e= <https://urldefense.proofpoint.com/v2/url?u=http-3A__akka.io_&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=5b5-GC2k85BKW51yEAecgwQYujsGU-PGmIBOts8xY6w&e=>>> @ Lightbend <https://urldefense.proofpoint.com/v2/url?u=http-3A__lightbend.com_&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=CBQKBaKMiqEV5R_eUsO3lULxZXmBerKDlw0Gsyw2pG0&e= <https://urldefense.proofpoint.com/v2/url?u=http-3A__lightbend.com_&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=CBQKBaKMiqEV5R_eUsO3lULxZXmBerKDlw0Gsyw2pG0&e=> <https://urldefense.proofpoint.com/v2/url?u=http-3A__lightbend.com_&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=CBQKBaKMiqEV5R_eUsO3lULxZXmBerKDlw0Gsyw2pG0&e= <https://urldefense.proofpoint.com/v2/url?u=http-3A__lightbend.com_&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=fU-qyMXO_EKnNqN2Bjq1yF0W8Dd6ATBG1CsLuDUF6ks&s=CBQKBaKMiqEV5R_eUsO3lULxZXmBerKDlw0Gsyw2pG0&e=>>>
> > >>> On October 12, 2017 at 23:54:21, Douglas Surber (douglas.surber at oracle.com <mailto:douglas.surber at oracle.com> <mailto:douglas.surber at oracle.com <mailto:douglas.surber at oracle.com>> <mailto:douglas.surber at oracle.com <mailto:douglas.surber at oracle.com> <mailto:douglas.surber at oracle.com <mailto:douglas.surber at oracle.com>>>) wrote:
> > >>>
> > >>>> After reading the JavaDoc it’s not clear to me that onNext cannot throw. The JavaDoc says “the resulting behavior is not guaranteed” but it doesn’t say that onNext cannot throw. Can you point to something that has more detail?
> > >>>>
> > >>>> Douglas
> > >>>>
> > >>>>
> > >>>>> On Oct 12, 2017, at 7:33 AM, Douglas Surber <douglas.surber at oracle.com <mailto:douglas.surber at oracle.com> <mailto:douglas.surber at oracle.com <mailto:douglas.surber at oracle.com>> <mailto:douglas.surber at oracle.com <mailto:douglas.surber at oracle.com> <mailto:douglas.surber at oracle.com <mailto:douglas.surber at oracle.com>>>> wrote:
> > >>>>>
> > >>>>> Thanks for the correction with regard to onNext.
> > >>>>>
> > >>>>>> On Oct 11, 2017, at 3:54 PM, Dávid Karnok <akarnokd at gmail.com <mailto:akarnokd at gmail.com> <mailto:akarnokd at gmail.com <mailto:akarnokd at gmail.com>> <mailto:akarnokd at gmail.com <mailto:akarnokd at gmail.com> <mailto:akarnokd at gmail.com <mailto:akarnokd at gmail.com>>> <mailto:akarnokd at gmail.com <mailto:akarnokd at gmail.com> <mailto:akarnokd at gmail.com <mailto:akarnokd at gmail.com>><mailto:akarnokd at gmail.com <mailto:akarnokd at gmail.com> <mailto:akarnokd at gmail.com <mailto:akarnokd at gmail.com>>>>> wrote:
> > >>>>>>
> > >>>>>>> Calling Subscription.onNext with any
> > >>>>>>> * Operation that is not a member of this OperationGroup, that is
> > >>>>>>> * was not created by calling one of the Operation factory methods on this
> > >>>>>>> * OperationGroup, will throw IllegalArgumentException.
> > >>>>>>
> > >>>>>> Processor.onNext must not throw, therefore an invalid onNext item should cancel the Subscription and signal onError to the Publisher side of the Processor.
> > >>>>>>
> > >>>>>>> Still thinking about how to publish rows.
> > >>>>>>
> > >>>>>> In rxjava-jdbc, ResultSet is used as a lambda parameter and the instance shown to the lambda should not escape it, i.e.,
> > >>>>>>
> > >>>>>> <T> Flow.Publisher<T> convert(Function<Row, T> mapper);
> > >>>>>>
> > >>>>>> 2017-10-11 22:05 GMT+02:00 Douglas Surber <douglas.surber at oracle.com <mailto:douglas.surber at oracle.com> <mailto:douglas.surber at oracle.com <mailto:douglas.surber at oracle.com>> <mailto:douglas.surber at oracle.com <mailto:douglas.surber at oracle.com> <mailto:douglas.surber at oracle.com <mailto:douglas.surber at oracle.com>>> <mailto:douglas.surber at oracle.com <mailto:douglas.surber at oracle.com> <mailto:douglas.surber at oracle.com <mailto:douglas.surber at oracle.com>><mailto:douglas.surber at oracle.com <mailto:douglas.surber at oracle.com> <mailto:douglas.surber at oracle.com <mailto:douglas.surber at oracle.com>>>> <mailto:douglas.surber at oracle.com <mailto:douglas.surber at oracle.com> <mailto:douglas.surber at oracle.com <mailto:douglas.surber at oracle.com>> <mailto:douglas.surber at oracle.com <mailto:douglas.surber at oracle.com> <mailto:douglas.surber at oracle.com <mailto:douglas.surber at oracle.com>>> <mailto:douglas.surber at oracle.com <mailto:douglas.surber at oracle.com> <mailto:douglas.surber at oracle.com <mailto:douglas.surber at oracle.com>><mailto:douglas.surber at oracle.com <mailto:douglas.surber at oracle.com> <mailto:douglas.surber at oracle.com <mailto:douglas.surber at oracle.com>>>>>>:
> > >>>>>> I have found what at first glance appears to be a non-hacky way to use Flow to channel Operations to an OperationGroup. It’s not perfect. The limitation that published Operations must be member Operations of the Operation group is unfortunate but absolutely necessary. Otherwise I think it is ok.
> > >>>>>>
> > >>>>>> Delete Connection.onSubscribe.
> > >>>>>>
> > >>>>>> Add the following to OperationGroup:
> > >>>>>>
> > >>>>>> /**
> > >>>>>> * Returns a Flow.Processor that subscribes to a sequence of Operations and
> > >>>>>> * publishes a sequence of corresponding Submissions. The Operations must
> > >>>>>> * be members of this OperationGroup. Calling Subscription.onNext with any
> > >>>>>> * Operation that is not a member of this OperationGroup, that is
> > >>>>>> * was not created by calling one of the Operation factory methods on this
> > >>>>>> * OperationGroup, will throw IllegalArgumentException. The method
> > >>>>>> * Subscription.onNext will call submit on each Operation it is passed
> > >>>>>> * and publish the resulting Submission. Since an Operation can only be
> > >>>>>> * submitted once, submitting an Operation and calling onNext with that
> > >>>>>> * submitted Operation will throw IllegalStateException.
> > >>>>>> *
> > >>>>>> * Each call to this method returns a new Flow.processor. The Submissions
> > >>>>>> * published to each Processor are exactly those generated by calling
> > >>>>>> * submit on the Operations passed as arguments to onNext on the same Processor.
> > >>>>>> *
> > >>>>>> * Note: If any Operation is submitted directly, that is by calling submit
> > >>>>>> * rather than passing it to onNext, the Submission returned by the submit
> > >>>>>> * call will not be published.
> > >>>>>> *
> > >>>>>> * @return a Flow.Processor that accepts Operations and generates Submissions
> > >>>>>> */
> > >>>>>> public Flow.Processor<Operation<T>, Submission<T>> operationProcessor();
> > >>>>>>
> > >>>>>> Comments?
> > >>>>>>
> > >>>>>> Still thinking about how to publish rows.
> > >>>>>>
> > >>>>>>
> > >>>>>> Douglas
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> --
> > >>>>>> Best regards,
> > >>>>>> David Karnok
> > >>
> > >
> >
> >
> >
> >
> > --
> > Cheers,
> > √
> >
> > ——————
> > Viktor Klang
> > Deputy CTO
> > Lightbend, Inc.
> 
> 
> 
> 
> -- 
> Best regards,
> David Karnok



More information about the jdbc-spec-discuss mailing list