Flow.Processor for Operation submission

Konrad “ktoso” Malawski ktosopl at gmail.com
Thu Oct 12 15:20:45 UTC 2017


Hi Douglas, David,

After reading the JavaDoc it’s not clear to me that onNext cannot throw.
The JavaDoc says “the resulting behavior is not guaranteed” but it doesn’t
say that onNext cannot throw. Can you point to something that has more
detail?


The full set of rules of Reactive Streams, and thus also, Flow.* semantics
is
available on the reactive streams site here:
https://github.com/reactive-streams/reactive-streams-jvm#specification

We (specifically; Viktor, initiator of RS, CCed here) have been in talks
with Doug Lea to include the actual specification rules
in JavaDoc of Flow as we felt this would happen (that people would not
realize there’s a full well-defined spec behind it).
I hope we’ll be able to get that done, as otherwise many people will be
left unaware of the rules that govern Flow implementations.
There also is a TCK available
<https://github.com/reactive-streams/reactive-streams-jvm/tree/master/tck> as
testng tests. I’ve been slow to update it
<https://github.com/reactive-streams/reactive-streams-jvm/pull/398> to
directly work on Flow.* types – very sorry about that, and will try to get
it completed very soon so Flow impls can use it directly.

Glad to see many RS contributors joining the discussion here, and hopefully
we’ll be able to help out making efficient use of Flow in at least some
parts of the API.


Similar APIs, that might be useful as examples in the discussion (I hope
that’s helpful):

I have yet to read up on details about what
Operation/OperationGroup/Submission etc really mean semantically,
but if you want to model a back-pressured stream that back-pressures the
incoming operations and emits Submissions (are they like “results”? I’ll
read up more),
then modeling it as a Processor is indeed a good idea. This is how we have
modeled the back-pressured HTTP-client in Akka HTTP, the type is
(translated):
Processor<HttpRequest, HttpResponse>, which may be backed by multiple
connections or by a single one internally, so the pulling in requests is
handled appropriately.
Looking at the API proposed here it seems that’s what you’re after; so I
can confirm we have such API in the wild since some years and it functions
well.
The API I talk about is here
https://github.com/akka/akka-http/blob/master/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala#L380
(assume
Flow is exactly like a Processor)

Another example of reactive-streams with JDBC in action is Slick, and it’s
wrappers in Alpakka, here’s examples:
https://developer.lightbend.com/docs/alpakka/current/slick.html
It’s pretty simple, you can click on the Java tab for inspiration (Sink ==
Subscriber, Source == Publisher, Flow == Processor), and basically the
simplest version of an RS integration.

Hope this helps,
happy to help with further design or general reactive-streams questions
(we’re trying to figure out how to help here the most efficiently
currently).


-- 
Cheers,
Konrad 'ktoso <http://kto.so>' Malawski
Akka <http://akka.io/> @ Lightbend <http://lightbend.com/>

On October 12, 2017 at 23:54:21, Douglas Surber (douglas.surber at oracle.com)
wrote:

After reading the JavaDoc it’s not clear to me that onNext cannot throw.
The JavaDoc says “the resulting behavior is not guaranteed” but it doesn’t
say that onNext cannot throw. Can you point to something that has more
detail?

Douglas


> On Oct 12, 2017, at 7:33 AM, Douglas Surber <douglas.surber at oracle.com>
wrote:
>
> Thanks for the correction with regard to onNext.
>
>> On Oct 11, 2017, at 3:54 PM, Dávid Karnok <akarnokd at gmail.com <mailto:
akarnokd at gmail.com>> wrote:
>>
>>> Calling Subscription.onNext with any
>>> * Operation that is not a member of this OperationGroup, that is
>>> * was not created by calling one of the Operation factory methods on
this
>>> * OperationGroup, will throw IllegalArgumentException.
>>
>> Processor.onNext must not throw, therefore an invalid onNext item should
cancel the Subscription and signal onError to the Publisher side of the
Processor.
>>
>>> Still thinking about how to publish rows.
>>
>> In rxjava-jdbc, ResultSet is used as a lambda parameter and the instance
shown to the lambda should not escape it, i.e.,
>>
>> <T> Flow.Publisher<T> convert(Function<Row, T> mapper);
>>
>> 2017-10-11 22:05 GMT+02:00 Douglas Surber <douglas.surber at oracle.com
<mailto:douglas.surber at oracle.com> <mailto:douglas.surber at oracle.com
<mailto:douglas.surber at oracle.com>>>:
>> I have found what at first glance appears to be a non-hacky way to use
Flow to channel Operations to an OperationGroup. It’s not perfect. The
limitation that published Operations must be member Operations of the
Operation group is unfortunate but absolutely necessary. Otherwise I think
it is ok.
>>
>> Delete Connection.onSubscribe.
>>
>> Add the following to OperationGroup:
>>
>> /**
>> * Returns a Flow.Processor that subscribes to a sequence of Operations
and
>> * publishes a sequence of corresponding Submissions. The Operations must
>> * be members of this OperationGroup. Calling Subscription.onNext with
any
>> * Operation that is not a member of this OperationGroup, that is
>> * was not created by calling one of the Operation factory methods on
this
>> * OperationGroup, will throw IllegalArgumentException. The method
>> * Subscription.onNext will call submit on each Operation it is passed
>> * and publish the resulting Submission. Since an Operation can only be
>> * submitted once, submitting an Operation and calling onNext with that
>> * submitted Operation will throw IllegalStateException.
>> *
>> * Each call to this method returns a new Flow.processor. The Submissions
>> * published to each Processor are exactly those generated by calling
>> * submit on the Operations passed as arguments to onNext on the same
Processor.
>> *
>> * Note: If any Operation is submitted directly, that is by calling
submit
>> * rather than passing it to onNext, the Submission returned by the
submit
>> * call will not be published.
>> *
>> * @return a Flow.Processor that accepts Operations and generates
Submissions
>> */
>> public Flow.Processor<Operation<T>, Submission<T>> operationProcessor();
>>
>> Comments?
>>
>> Still thinking about how to publish rows.
>>
>>
>> Douglas
>>
>>
>>
>> --
>> Best regards,
>> David Karnok


More information about the jdbc-spec-discuss mailing list