Reactive streams and aggregate reads and writes
Dávid Karnok
akarnokd at gmail.com
Wed Jan 16 15:18:03 UTC 2019
The linked sources:
/**
* Sets a parameter to contain the sequence of bytes provided via a sequence
* of ByteBuffers and each spent ByteBuffer is then sent to an user-provided
* Consumer.
* <ul>
* <li>The consumption of this Flow.Publisher happens only when this
* JdbcOperation executes.</li>
* <li>If the Flow.Publisher fails, the associated JdbcStatement will
* indicate this failure.</li>
* <li>This allows to feed the JdbcOperation/JdbcStatement to be fed
* by pooled ByteBuffers that when fully consumed can be reused later.</li>
* </ul>
* @param id the parameter identifier
* @param buffers the Flow.Publisher providing the ByteBuffers
* @param release called with the ByteBuffer received through the
Flow.Publisher
* @return this Builder
*/
Builder parameterBuffer(String id, Flow.Publisher<ByteBuffer> buffers,
Consumer<ByteBuffer> release);
/**
* Returns the contents of a column as a sequence of ByteBuffers (or
subclasses) provided by
* an supplier function.
* <ul>
* <li>The returned Flow.Publisher may or may not allow multiple
Subscribers.</li>
* <li>The capacity shown to the bufferSupplier function is a hint for
sizing the
* ByteBuffers returned. The driver should adapt to the actual capacity of
each individual
* ByteBuffers returned by the function.</li>
* </ul>
* @param column the column identifier
* @param bufferSupplier the function that receives a capacity value and
should return a
* ByteBuffer subclass that will be filled in.
* @param <B> the ByteBuffer subclass
* @return the Flow.Publisher that will emit the contents as custom
ByteBuffers
* @throws IllegalArgumentException if the column is missing or the driver
doesn't
* support retrieving that column as a sequence of ByteBuffers.
*/
<B extends ByteBuffer> Flow.Publisher<B> getBuffer(int column, IntFunction<B
> bufferSupplier);
Viktor Klang <v at lightbend.com> ezt írta (időpont: 2019. jan. 16., Sze,
9:16):
> > * Any bytes between the position and the limit when the call to onNext
> * returns will not be between the position and the limit in the
> ByteBuffer
> * passed to the next call. The ByteBuffer passed to calls to
> * Subscriber.onNext are valid only for the duration of the call. After a
> call
> * to onNext returns, using the ByteBuffer passed to that call may yield
> * unexpected results.
>
> I understand what led to that formulation, however, it cannot be true as
> *onNext* is only a *signal*—when it returns the only thing which is true is
> that the Subscriber will receive it, not that it has processed it. Therefor
> no claims can be made as to ownership—and it needs to be clarified whether
> those ByteBuffers are readOnly.
>
> >The Subscriber may throw
> * IllegalArgumentException if a ByteBuffer is larger though it is not
> * required to.
>
> No, the Subscriber is not allowed to throw IllegalArgumentException from
> *onNext* as per 2.13
> <https://github.com/reactive-streams/reactive-streams-jvm#2.13>.
>
> Besides that, your proposal looks like a good compromise.
>
> On Tue, Jan 15, 2019 at 11:34 PM Douglas Surber <douglas.surber at oracle.com
> >
> wrote:
>
> > How about something like this for Blob?
> >
> > Douglas
> >
> > /**
> > * Return an Operation the result of which will publish the bytes of
> this
> > * Blob in order from the position at the time the Operation is
> > completed.
> > * Subsequent Operations on this Blob <em>will | will not<\em> change
> > the
> > * sequence of bytes published.
> > *
> > * Any bytes between the position and the limit when the call to onNext
> > * returns will not be between the position and the limit in the
> > ByteBuffer
> > * passed to the next call. The ByteBuffer passed to calls to
> > * Subscriber.onNext are valid only for the duration of the call. After
> > a call
> > * to onNext returns, using the ByteBuffer passed to that call may
> yield
> > * unexpected results.
> > *
> > * @param chunkSize the maximum size of the ByteBuffer the Publisher
> > * will pass in a call to Subscriber.onNext.
> > * @return
> > */
> > public Operation<Flow.Publisher<ByteBuffer>> publisherOperation(int
> > chunkSize);
> >
> > /**
> > * Return an Operation the result of which is a Subscriber which will
> > insert
> > * the bytes published to it into this Blob at the position at the time
> > the
> > * Operation is completed. Subsequent Operations on this Blob will not
> > change
> > * the position at which subsequent published bytes are inserted.
> > *
> > * @param chunkSize the maximum size of ByteBuffer the Subscriber
> > * expects to receive in a call to onNext. The Subscriber may throw
> > * IllegalArgumentException if a ByteBuffer is larger though it is not
> > * required to.
> > * @return a Subscriber to the bytes of this Blob
> > */
> > public Operation<Flow.Subscriber<ByteBuffer>> subscriberOperation(int
> > chunkSize);
> >
> > /**
> > * Return the default chunk size for this Blob. This is the default
> > maximum size of
> > * the ByteBuffer passed to Subscriber.onNext.
> > *
> > * @return the default chunk size
> > */
> > public int defaultChunkSize();
> >
> > /**
> > * Convenience method that create a Publisher of the bytes of this
> Blob.
> > *
> > * @return a Publisher of the bytes of this Blob
> > */
> > public default CompletionStage<Flow.Publisher<ByteBuffer>> publisher()
> {
> > return
> > publisherOperation(defaultChunkSize()).submit().getCompletionStage();
> > }
> >
> > /**
> > * Convenience method that create a Subscriber for bytes in this Blob.
> > *
> > * @return a Subscriber for inserting bytes into this Blob.
> > */
> > public default CompletionStage<Flow.Subscriber<ByteBuffer>>
> subscriber()
> > {
> > return
> > subscriberOperation(defaultChunkSize()).submit().getCompletionStage();
> > }
> >
> >
>
> --
> Cheers,
> √
>
> *Deputy Chief Technology Officer, Lightbend, Inc.*
> v at lightbend.com
> @viktorklang <https://twitter.com/viktorklang>
>
> <https://www.lightbend.com>
>
--
Best regards,
David Karnok
More information about the jdbc-spec-discuss
mailing list