Reactive streams and aggregate reads and writes

Dávid Karnok akarnokd at gmail.com
Tue Jan 15 08:22:50 UTC 2019


I'm not sure how others solved this, or at all. R2DBC does not but maybe
Reactor-Netty, Reactor-IO or RSocket does on some level.

I suggested an API quite some time ago which uses ByteBuffers that whose
creation can be controlled by the user:

1) When the client sends data, it provides a Publisher<ByteBuffer>. Those
buffers are controlled by the caller. Note that it also offers a release
mechanism for pooled buffers:


   -
   https://github.com/akarnokd/akarnokd-misc-java9/blob/master/src/main/java/hu/akarnokd/java9/jdbc/JdbcOperation.java#L85



2) When the client receives data, it can provide a factory for ByteBuffers
which will be filled in by the driver.

   -
   https://github.com/akarnokd/akarnokd-misc-java9/blob/master/src/main/java/hu/akarnokd/java9/jdbc/JdbcRow.java#L60


3) Also there is a default batching possible per driver for the more
convenient overloads using byte[] or driver-provided ByteBuffers.




Douglas Surber <douglas.surber at oracle.com> ezt írta (időpont: 2019. jan.
15., K, 2:58):

> Given a reactive source/sink of bytes or chars what is the best way to
> subscribe/publish byte[]/String?
>
> When working with an Input/OutputStream or a Reader/Writer it is easy to
> define methods that read both single bytes/chars and byte[]s/Strings. But
> it is not obvious to me how to do the same with Flow. If one declares a
> Flow.Publisher<Byte> then the bytes are processed one at a time. In high
> throughput systems that doesn't work. It is much better to read byte[]s,
> preferably large ones, on each call rather than single bytes. This can make
> a really big difference in performance. Same with processing individual
> chars as opposed to Strings. On the other hand, sometimes processing only a
> single byte/char is the right thing.
>
> Let's consider a Flow.Publisher<byte[]>. If the subscriber requests 1
> element, the Subscriber has no idea how many bytes that element will
> contain. It could be 1 byte or it could be 1G byte. What the Subscriber
> wants to do is say "give me 16k bytes" and get one or more calls to next
> where the total length of all the byte[]s passed to next is 16k, not 16k
> calls to next. This clearly violates the contract for
> Flow.Subscription.request.
>
> I can't be the first person to have this question, but my Google-fu is not
> sufficient to find an answer. I think the problem is I don't know the
> reactive stream terminology necessary to pose the question.
>
> The particular use case I'm working on is Blob/Clob. It would be easy
> enough for Blob/Clob to provide publish/subscribe methods that return
> Flow.Publisher<Byte/Char> and Flow.Subscriber<Byte/Char>, but that's not
> going to be performant. On the other hand Flow.Publisher<byte[]/String> and
> Flow.Subscriber<byte[]/String> don't really do the right thing.
>
> Suggestions? How do other reactive stream users address this issue?
>
> Douglas



-- 
Best regards,
David Karnok


More information about the jdbc-spec-discuss mailing list