Reactive streams and aggregate reads and writes
Viktor Klang
v at lightbend.com
Wed Jan 16 08:15:57 UTC 2019
> * Any bytes between the position and the limit when the call to onNext
* returns will not be between the position and the limit in the
ByteBuffer
* passed to the next call. The ByteBuffer passed to calls to
* Subscriber.onNext are valid only for the duration of the call. After a
call
* to onNext returns, using the ByteBuffer passed to that call may yield
* unexpected results.
I understand what led to that formulation, however, it cannot be true as
*onNext* is only a *signal*—when it returns the only thing which is true is
that the Subscriber will receive it, not that it has processed it. Therefor
no claims can be made as to ownership—and it needs to be clarified whether
those ByteBuffers are readOnly.
>The Subscriber may throw
* IllegalArgumentException if a ByteBuffer is larger though it is not
* required to.
No, the Subscriber is not allowed to throw IllegalArgumentException from
*onNext* as per 2.13
<https://github.com/reactive-streams/reactive-streams-jvm#2.13>.
Besides that, your proposal looks like a good compromise.
On Tue, Jan 15, 2019 at 11:34 PM Douglas Surber <douglas.surber at oracle.com>
wrote:
> How about something like this for Blob?
>
> Douglas
>
> /**
> * Return an Operation the result of which will publish the bytes of this
> * Blob in order from the position at the time the Operation is
> completed.
> * Subsequent Operations on this Blob <em>will | will not<\em> change
> the
> * sequence of bytes published.
> *
> * Any bytes between the position and the limit when the call to onNext
> * returns will not be between the position and the limit in the
> ByteBuffer
> * passed to the next call. The ByteBuffer passed to calls to
> * Subscriber.onNext are valid only for the duration of the call. After
> a call
> * to onNext returns, using the ByteBuffer passed to that call may yield
> * unexpected results.
> *
> * @param chunkSize the maximum size of the ByteBuffer the Publisher
> * will pass in a call to Subscriber.onNext.
> * @return
> */
> public Operation<Flow.Publisher<ByteBuffer>> publisherOperation(int
> chunkSize);
>
> /**
> * Return an Operation the result of which is a Subscriber which will
> insert
> * the bytes published to it into this Blob at the position at the time
> the
> * Operation is completed. Subsequent Operations on this Blob will not
> change
> * the position at which subsequent published bytes are inserted.
> *
> * @param chunkSize the maximum size of ByteBuffer the Subscriber
> * expects to receive in a call to onNext. The Subscriber may throw
> * IllegalArgumentException if a ByteBuffer is larger though it is not
> * required to.
> * @return a Subscriber to the bytes of this Blob
> */
> public Operation<Flow.Subscriber<ByteBuffer>> subscriberOperation(int
> chunkSize);
>
> /**
> * Return the default chunk size for this Blob. This is the default
> maximum size of
> * the ByteBuffer passed to Subscriber.onNext.
> *
> * @return the default chunk size
> */
> public int defaultChunkSize();
>
> /**
> * Convenience method that create a Publisher of the bytes of this Blob.
> *
> * @return a Publisher of the bytes of this Blob
> */
> public default CompletionStage<Flow.Publisher<ByteBuffer>> publisher() {
> return
> publisherOperation(defaultChunkSize()).submit().getCompletionStage();
> }
>
> /**
> * Convenience method that create a Subscriber for bytes in this Blob.
> *
> * @return a Subscriber for inserting bytes into this Blob.
> */
> public default CompletionStage<Flow.Subscriber<ByteBuffer>> subscriber()
> {
> return
> subscriberOperation(defaultChunkSize()).submit().getCompletionStage();
> }
>
>
--
Cheers,
√
*Deputy Chief Technology Officer, Lightbend, Inc.*
v at lightbend.com
@viktorklang <https://twitter.com/viktorklang>
<https://www.lightbend.com>
More information about the jdbc-spec-discuss
mailing list