Reactive streams and aggregate reads and writes

Dávid Karnok akarnokd at gmail.com
Wed Jan 16 15:18:03 UTC 2019


The linked sources:

/**
* Sets a parameter to contain the sequence of bytes provided via a sequence
* of ByteBuffers and each spent ByteBuffer is then sent to an user-provided
* Consumer.
* <ul>
* <li>The consumption of this Flow.Publisher happens only when this
* JdbcOperation executes.</li>
* <li>If the Flow.Publisher fails, the associated JdbcStatement will
* indicate this failure.</li>
* <li>This allows to feed the JdbcOperation/JdbcStatement to be fed
* by pooled ByteBuffers that when fully consumed can be reused later.</li>
* </ul>
* @param id the parameter identifier
* @param buffers the Flow.Publisher providing the ByteBuffers
* @param release called with the ByteBuffer received through the
Flow.Publisher
* @return this Builder
*/
Builder parameterBuffer(String id, Flow.Publisher<ByteBuffer> buffers,
Consumer<ByteBuffer> release);


/**
* Returns the contents of a column as a sequence of ByteBuffers (or
subclasses) provided by
* an supplier function.
* <ul>
* <li>The returned Flow.Publisher may or may not allow multiple
Subscribers.</li>
* <li>The capacity shown to the bufferSupplier function is a hint for
sizing the
* ByteBuffers returned. The driver should adapt to the actual capacity of
each individual
* ByteBuffers returned by the function.</li>
* </ul>
* @param column the column identifier
* @param bufferSupplier the function that receives a capacity value and
should return a
* ByteBuffer subclass that will be filled in.
* @param <B> the ByteBuffer subclass
* @return the Flow.Publisher that will emit the contents as custom
ByteBuffers
* @throws IllegalArgumentException if the column is missing or the driver
doesn't
* support retrieving that column as a sequence of ByteBuffers.
*/
<B extends ByteBuffer> Flow.Publisher<B> getBuffer(int column, IntFunction<B
> bufferSupplier);

Viktor Klang <v at lightbend.com> ezt írta (időpont: 2019. jan. 16., Sze,
9:16):

> >  * Any bytes between the position and the limit when the call to onNext
>    * returns will not be between the position and the limit in the
> ByteBuffer
>    * passed to the next call. The ByteBuffer passed to calls to
>    * Subscriber.onNext are valid only for the duration of the call. After a
> call
>    * to onNext returns, using the ByteBuffer passed to that call may yield
>    * unexpected results.
>
> I understand what led to that formulation, however, it cannot be true as
> *onNext* is only a *signal*—when it returns the only thing which is true is
> that the Subscriber will receive it, not that it has processed it. Therefor
> no claims can be made as to ownership—and it needs to be clarified whether
> those ByteBuffers are readOnly.
>
> >The Subscriber may throw
>    * IllegalArgumentException if a ByteBuffer is larger though it is not
>    * required to.
>
> No, the Subscriber is not allowed to throw IllegalArgumentException from
> *onNext* as per 2.13
> <https://github.com/reactive-streams/reactive-streams-jvm#2.13>.
>
> Besides that, your proposal looks like a good compromise.
>
> On Tue, Jan 15, 2019 at 11:34 PM Douglas Surber <douglas.surber at oracle.com
> >
> wrote:
>
> > How about something like this for Blob?
> >
> > Douglas
> >
> >  /**
> >    * Return an Operation the result of which will publish the bytes of
> this
> >    * Blob in order from the position at the time the Operation is
> > completed.
> >    * Subsequent Operations on this Blob <em>will | will not<\em> change
> > the
> >    * sequence of bytes published.
> >    *
> >    * Any bytes between the position and the limit when the call to onNext
> >    * returns will not be between the position and the limit in the
> > ByteBuffer
> >    * passed to the next call. The ByteBuffer passed to calls to
> >    * Subscriber.onNext are valid only for the duration of the call. After
> > a call
> >    * to onNext returns, using the ByteBuffer passed to that call may
> yield
> >    * unexpected results.
> >    *
> >    * @param chunkSize the maximum size of the ByteBuffer the Publisher
> >    * will pass in a call to Subscriber.onNext.
> >    * @return
> >    */
> >   public Operation<Flow.Publisher<ByteBuffer>> publisherOperation(int
> > chunkSize);
> >
> >   /**
> >    * Return an Operation the result of which is a Subscriber which will
> > insert
> >    * the bytes published to it into this Blob at the position at the time
> > the
> >    * Operation is completed. Subsequent Operations on this Blob will not
> > change
> >    * the position at which subsequent published bytes are inserted.
> >    *
> >    * @param chunkSize the maximum size of ByteBuffer the Subscriber
> >    * expects to receive in a call to onNext. The Subscriber may throw
> >    * IllegalArgumentException if a ByteBuffer is larger though it is not
> >    * required to.
> >    * @return a Subscriber to the bytes of this Blob
> >    */
> >   public Operation<Flow.Subscriber<ByteBuffer>> subscriberOperation(int
> > chunkSize);
> >
> >   /**
> >    * Return the default chunk size for this Blob. This is the default
> > maximum size of
> >    * the ByteBuffer passed to Subscriber.onNext.
> >    *
> >    * @return the default chunk size
> >    */
> >   public int defaultChunkSize();
> >
> >   /**
> >    * Convenience method that create a Publisher of the bytes of this
> Blob.
> >    *
> >    * @return a Publisher of the bytes of this Blob
> >    */
> >   public default CompletionStage<Flow.Publisher<ByteBuffer>> publisher()
> {
> >     return
> > publisherOperation(defaultChunkSize()).submit().getCompletionStage();
> >   }
> >
> >   /**
> >    * Convenience method that create a Subscriber for bytes in this Blob.
> >    *
> >    * @return a Subscriber for inserting  bytes into this Blob.
> >    */
> >   public default CompletionStage<Flow.Subscriber<ByteBuffer>>
> subscriber()
> > {
> >     return
> > subscriberOperation(defaultChunkSize()).submit().getCompletionStage();
> >   }
> >
> >
>
> --
> Cheers,
>>
> *Deputy Chief Technology Officer, Lightbend, Inc.*
> v at lightbend.com
> @viktorklang <https://twitter.com/viktorklang>
>
> <https://www.lightbend.com>
>


-- 
Best regards,
David Karnok


More information about the jdbc-spec-discuss mailing list